ontohub/ontohub

View on GitHub
app/models/hets_instance.rb

Summary

Maintainability
A
45 mins
Test Coverage
class HetsInstance < ActiveRecord::Base
  class Error < ::StandardError; end
  class NoRegisteredHetsInstanceError < Error
    DEFAULT_MSG = 'There is no registered HetsInstance for this application'

    def initialize(msg = DEFAULT_MSG)
      super
    end
  end

  class NoSelectableHetsInstanceError < Error
    DEFAULT_MSG = <<-MSG
There is no HetsInstance which is reachable and
has a minimal Hets version of #{Hets.minimal_version_string}
    MSG

    def initialize(msg = DEFAULT_MSG)
      super
    end
  end

  STATES = %w(free force-free busy)
  MUTEX_KEY = :choose_hets_instance
  MUTEX_EXPIRATION = 2.minutes
  FORCE_FREE_WAITING_PERIOD = 1.days

  attr_accessible :name, :uri, :state, :queue_size

  before_save :set_up_state, unless: ->() { changed_attributes.key?("up") }
  before_save :set_state_updated_at
  after_create :start_update_clock

  validate :state, inclusion: {in: STATES}
  validate :queue_size, numericality: {greater_than_or_equal_to: 0}

  scope :active, -> do
    where(up: true).where('version >= ?', Hets.minimal_version_string)
  end
  scope :free, -> do
    where(state: 'free')
  end
  scope :force_free, -> do
    where(state: 'force-free')
  end
  scope :busy, -> do
    where(state: 'busy')
  end
  scope :load_balancing_order, -> do
    order('queue_size ASC').order('state_updated_at ASC')
  end

  class << self
    def with_instance!
      instance = choose!
      begin
        result = yield(instance)
      rescue StandardError
        exclusively { instance.finish_work! }
        raise
      end
      exclusively { instance.finish_work! }
      result
    end

    def choose!(try_again: true)
      raise NoRegisteredHetsInstanceError.new unless any?
      instance = exclusively { choose_set_instance }
      if instance && instance.send(:check_up_state)
        instance
      elsif try_again
        find_each { |hets_instance| hets_instance.send(:set_up_state!) }
        choose!(try_again: false)
      else
        instance || raise(NoSelectableHetsInstanceError.new)
      end
    end

    def check_up_state!(hets_instance_id)
      find(hets_instance_id).send(:set_up_state!)
    end

    protected

    def exclusively
      ::Semaphore.exclusively(MUTEX_KEY, expiration: MUTEX_EXPIRATION) { yield }
    end

    def increment_queue!
      if instance = yield
        instance.queue_size += 1
        instance.save!
        instance
      end
    end

    def choose_set_instance
      instance = active.free.first
      instance ||=
        increment_queue! { active.force_free.load_balancing_order.first }
      instance ||= increment_queue! { active.busy.load_balancing_order.first }
      instance.try(:set_busy!)
      instance
    end
  end

  # will result in 0.99 for <v0.99, something or other>
  def general_version
    version.split(', ').first[1..-1] if version
  end

  # will result in 1409043198 for <v0.99, 1409043198>
  def specific_version
    version.split(', ').last if version
  end

  def up?
    up
  end

  def to_s
    "#{name}(#{uri})"
  end

  def finish_work!
    reload
    Sidekiq::Status.unschedule(@force_free_job_id)
    self.queue_size -= 1 if queue_size > 0
    if queue_size > 0
      set_busy!
    else
      set_free!
    end
    save!
  end

  def set_free!
    self.state = 'free'
    save!
  end

  def set_force_free!
    if reload.state == 'busy'
      self.state = 'force-free'
      save!
    end
  end

  def set_busy!
    self.state = 'busy'
    @force_free_job_id =
      HetsInstanceForceFreeWorker.perform_in(FORCE_FREE_WAITING_PERIOD, id)
    save!
  end

  protected

  def check_up_state
    Hets::VersionCaller.new(self).call
  rescue Errno::ECONNREFUSED, Errno::ETIMEDOUT, Errno::ECONNRESET
    nil
  end

  def set_up_state
    version = check_up_state
    self.up = !! version
    self.version = version if up
  end

  def set_up_state!
    set_up_state
    save!
  end

  def set_state_updated_at
    self.state_updated_at = Time.now if state_changed?
  end

  def start_update_clock
    HetsInstanceWorker.schedule_update(id)
  end
end