dtaniwaki/sidekiq-merger

View on GitHub
lib/sidekiq/merger/merge.rb

Summary

Maintainability
A
25 mins
Test Coverage
require_relative "redis"
require "active_support/core_ext/hash/indifferent_access"

class Sidekiq::Merger::Merge
  class << self
    def all
      redis = Sidekiq::Merger::Redis.new

      redis.all_merges.map { |full_merge_key| initialize_with_full_merge_key(full_merge_key, redis: redis) }
    end

    def initialize_with_full_merge_key(full_merge_key, options = {})
      keys = full_merge_key.split(":")
      raise "Invalid merge key" if keys.size < 3
      worker_class = keys[0].camelize.constantize
      queue = keys[1]
      merge_key = keys[2]
      new(worker_class, queue, merge_key, options)
    end

    def initialize_with_args(worker_class, queue, args, options = {})
      new(worker_class, queue, merge_key(worker_class, args), options)
    end

    def merge_key(worker_class, args)
      options = get_options(worker_class)
      merge_key = options["key"]
      if merge_key.respond_to?(:call)
        merge_key = merge_key.call(args)
      end
      merge_key = "" if merge_key.nil?
      merge_key = merge_key.to_json unless merge_key.is_a?(String)
      merge_key
    end

    def get_options(worker_class)
      (worker_class.get_sidekiq_options["merger"] || {}).with_indifferent_access
    end
  end

  attr_reader :worker_class, :queue, :merge_key

  def initialize(worker_class, queue, merge_key, redis: Sidekiq::Merger::Redis.new)
    @worker_class = worker_class
    @queue = queue
    @merge_key = merge_key
    @redis = redis
  end

  def add(args, execution_time)
    if !options[:unique] || !@redis.merge_exists?(full_merge_key, args)
      @redis.push_message(full_merge_key, args, execution_time)
    end
  end

  def delete(args)
    @redis.delete_message(full_merge_key, args)
  end

  def delete_all
    @redis.delete_merge(full_merge_key)
  end

  def size
    @redis.merge_size(full_merge_key)
  end

  def flush
    msgs = []

    if @redis.lock_merge(full_merge_key, Sidekiq::Merger::Config.lock_ttl)
      msgs = @redis.pluck_merge(full_merge_key)
    end

    unless msgs.empty?
      batches = options[:batch_size].nil? ? [msgs] : msgs.each_slice(options[:batch_size].to_i).to_a
      batches.each do |batch_msgs|
        # preserve FIFO when enqueuing batches
        Sidekiq::Client.push(
          "class" => worker_class,
          "queue" => queue,
          "args" => batch_msgs,
          "merged" => true
        )
      end
    end
  end

  def can_flush?
    !execution_time.nil? && execution_time < Time.now
  end

  def full_merge_key
    @full_merge_key ||= [worker_class.name.to_s.underscore, queue, merge_key].join(":")
  end

  def all_args
    @redis.get_merge(full_merge_key)
  end

  def execution_time
    @execution_time ||= @redis.merge_execution_time(full_merge_key)
  end

  def ==(other)
    self.worker_class == other.worker_class &&
    self.queue == other.queue &&
    self.merge_key == other.merge_key
  end

  private

  def options
    @options ||= self.class.get_options(worker_class)
  rescue NameError
    {}
  end
end