bumbleworks/bumbleworks

View on GitHub
lib/bumbleworks/worker/info.rb

Summary

Maintainability
B
6 hrs
Test Coverage
require_relative "proxy"

class Bumbleworks::Worker < Ruote::Worker
  class Info < Ruote::Worker::Info
    attr_reader :worker
    extend Forwardable
    extend Enumerable

    def_delegators :worker,
      :id, :pid, :name, :state, :ip, :hostname, :system, :launched_at

    class << self
      def raw_hash
        Bumbleworks.dashboard.worker_info || {}
      end

      def each
        raw_hash.each { |k, v|
          yield from_hash(v)
        }
      end

      def all
        to_a
      end

      def where(options)
        filter_proc = proc { |worker|
          options.all? { |k, v|
            worker.send(k.to_sym) == v
          }
        }
        filter(&filter_proc)
      end

      def filter
        return [] unless block_given?
        select { |info| yield info.worker }
      end

      def [](worker_id)
        from_hash(raw_hash[worker_id])
      end

      def from_hash(hash)
        new(Bumbleworks::Worker::Proxy.new(hash))
      end

      def forget_worker(id_to_delete)
        purge_worker_info do |id, info|
          id == id_to_delete
        end
      end

      def purge_stale_worker_info
        purge_worker_info do |id, info|
          info['state'].nil? || info['state'] == 'stopped'
        end
      end

      def purge_worker_info(&block)
        doc = Bumbleworks.dashboard.storage.get('variables', 'workers')
        return unless doc
        doc['workers'] = doc['workers'].reject { |id, info|
          block.call(id, info)
        }
        result = Bumbleworks.dashboard.storage.put(doc)
        purge_stale_worker_info if result
        all
      end
    end

    def initialize(worker)
      @worker = worker
      @last_save = Time.now - 2 * 60

      @msgs = [] unless worker.is_a?(Bumbleworks::Worker::Proxy)
    end

    def ==(other)
      other.is_a?(Bumbleworks::Worker::Info) &&
        other.worker == worker
    end

    def raw_hash
      self.class.raw_hash[worker.id]
    end

    def reload
      @worker = Bumbleworks::Worker::Proxy.new(raw_hash)
    end

    def record_new_state(state)
      worker.state = state
      save
    end

    def worker_class_name
      worker.class_name
    end

    def uptime
      if in_stopped_state? && worker.respond_to?(:uptime)
        worker.uptime
      else
        Time.now - worker.launched_at
      end
    end

    def in_stopped_state?
      worker.state.nil? || ["stopped", "stalled"].include?(worker.state)
    end

    def updated_at
      Time.parse(raw_hash['put_at'])
    end

    def updated_since?(latest_time)
      updated_at >= latest_time
    end

    def updated_recently?(options = {})
      options[:seconds_ago] ||= Bumbleworks.timeout
      latest_time = Time.now - options[:seconds_ago]
      updated_since?(latest_time)
    end

    def responding?(options = {})
      options[:since] ||= Time.now - Bumbleworks.timeout
      Bumbleworks::Worker.with_worker_state_enabled do
        Bumbleworks::Support.wait_until(options) do
          updated_since?(options[:since])
        end
      end
      true
    rescue Bumbleworks::Support::WaitTimeout
      false
    end

    def stalling?
      !responding?
    end

    def storage
      @worker.storage || Bumbleworks.dashboard.storage
    end

    def shutdown(options = {})
      send_command("stopped", options)
    end

    def pause(options = {})
      send_command("paused", options)
    end

    def unpause(options = {})
      send_command("running", options)
    end

    alias_method :run, :unpause

    def send_command(command, options = {})
      save_control_message(command)
      Bumbleworks::Worker.with_worker_state_enabled do
        Bumbleworks::Support.wait_until(options) do
          raw_hash["state"] == command
        end
      end
      reload
    end

    def save_control_message(message)
      doc = Bumbleworks::Worker.control_document
      doc["workers"][id] ||= {}
      doc["workers"][id]["state"] = message
      storage.put(doc)
    end

    def processed_last_minute
      raw_hash["processed_last_minute"]
    end

    def wait_time_last_minute
      raw_hash["wait_time_last_minute"]
    end

    def processed_last_hour
      raw_hash["processed_last_hour"]
    end

    def wait_time_last_hour
      raw_hash["wait_time_last_hour"]
    end

    def constant_worker_info_hash
      {
        "id" => @worker.id,
        "class" => @worker.class_name,
        "name" => @worker.name,
        "ip" => @worker.ip,
        "hostname" => @worker.hostname,
        "pid" => @worker.pid,
        "system" => @worker.system,
        "launched_at" => @worker.launched_at,
        "state" => @worker.state
      }
    end

    def save
      doc = Bumbleworks::Worker.info_document

      worker_info_hash = doc['workers'][@worker.id] || {}

      worker_info_hash.merge!(constant_worker_info_hash)
      worker_info_hash.merge!({
        'put_at' => Ruote.now_to_utc_s,
        'uptime' => uptime,
      })

      if defined?(@msgs)
        now = Time.now

        @msgs = @msgs.drop_while { |msg|
          Time.parse(msg['processed_at']) < now - 3600
        }
        mm = @msgs.drop_while { |msg|
          Time.parse(msg['processed_at']) < now - 60
        }

        hour_count = @msgs.size < 1 ? 1 : @msgs.size
        minute_count = mm.size < 1 ? 1 : mm.size

        worker_info_hash.merge!({
          'processed_last_minute' =>
            mm.size,
          'wait_time_last_minute' =>
            mm.inject(0.0) { |s, m| s + m['wait_time'] } / minute_count.to_f,
          'processed_last_hour' =>
            @msgs.size,
          'wait_time_last_hour' =>
            @msgs.inject(0.0) { |s, m| s + m['wait_time'] } / hour_count.to_f
        })
      end

      doc['workers'][@worker.id] = worker_info_hash

      r = storage.put(doc)

      @last_save = Time.now

      save if r != nil
    end
  end
end