jdrago999/distribot

View on GitHub
examples/worker

Summary

Maintainability
Test Coverage
#!/usr/bin/env ruby

require 'bundler/setup'
require 'distribot'
require 'byebug'
require 'pp'
require 'active_support/core_ext/object'
require 'active_support/core_ext/array'
require 'active_support/json'

Distribot.configure do |config|
  config.redis_url = ENV['DISTRIBOT_REDIS_URL']
  config.rabbitmq_url = ENV['DISTRIBOT_RABBITMQ_URL']
end

class SimpleWorker
  include Distribot::Worker
  version '1.0.0'
  enumerate_with :enumerate
  process_tasks_with :process

  def enumerate(_context, &callback)
    jobs = [ ]
    2.times do |chunk|
      max = 5
      jobs << (1..max).to_a.map{|n| {id: SecureRandom.uuid, args: [chunk, n] } }
    end
    callback.call(jobs.flatten)
  end

  def process(_context, job)
    logger.info job.to_s
    raise "Test Error!" if rand >= 0.9
    job_time = Distribot.redis.get('difficulty').to_f
    sleep job_time <= 0 ? 3 : job_time
  end
end

class HardWorker < SimpleWorker;
  version '1.1.0'
  enumerate_with :enumerate
  process_tasks_with :process
end
class GoodWorker < SimpleWorker;
  version '1.0.0'
  enumerate_with :enumerate
  process_tasks_with :process
end
class FastWorker < SimpleWorker;
  version '1.0.0'
  enumerate_with :enumerate
  process_tasks_with :process
end
class CheapWorker < SimpleWorker;
  version '1.0.0'
  enumerate_with :enumerate
  process_tasks_with :process
end
class ForeignWorker < SimpleWorker;
  version '1.0.0'
  enumerate_with :enumerate
  process_tasks_with :process
end
class SlowWorker < SimpleWorker;
  version '1.0.0'
  enumerate_with :enumerate
  process_tasks_with :process
end

module News
  class ArticleLister
    include Distribot::Worker
    version '1.0.1'
    enumerate_with :enumerate
    process_tasks_with :process
    def enumerate(context, &callback)
      @enumerations ||= 0
      story_ids = (1..20).to_a
      tasks = story_ids.map { |id| {story_id: "#{@enumerations}.#{id}"} }
      @enumerations += 1
      return tasks
    end
    def process(context, task)
      json = { url: "https://infra/?story_id=#{task[:story_id]}" }.to_json
      data_key = "flow.#{context.flow_id}.stories"
      parsed = JSON.parse(json, symbolize_names: true) rescue {}
      if parsed.key? :url
        Distribot.redis.multi do |redis|
          place = [data_key, task[:story_id]].join(':')
          redis.set place, json
          redis.sadd data_key, place
        end
      end
    end
  end

  class ArticleDownloaderWorker
    include Distribot::Worker
    require 'wrest'
    version '1.0.0'
    enumerate_with :enumerate
    process_tasks_with :process

    def enumerate(context)
      data_key = "flow.#{context.flow_id}.stories"
      tasks = Distribot.redis.smembers(data_key).map { |location| {data_locator: location} }
      Distribot.redis.del data_key
      return tasks
    end

    def process(context, task)
      data = JSON.parse(Distribot.redis.get(task[:data_locator]), symbolize_names: true)
puts data
#      data[:url].to_uri(verify_mode: OpenSSL::SSL::VERIFY_NONE).get
      Distribot.redis.del(task[:data_locator])
    end
  end
end

puts News::ArticleLister.new.run
puts News::ArticleDownloaderWorker.new.run

# Distribot.logger.info HardWorker.new.run
# Distribot.logger.info GoodWorker.new.run
# Distribot.logger.info FastWorker.new.run
# Distribot.logger.info CheapWorker.new.run
# Distribot.logger.info ForeignWorker.new.run
# Distribot.logger.info SlowWorker.new.run

puts "Worker up and running!"

sleep