heelhook/frisky

View on GitHub
lib/frisky/commands/event_scheduler.rb

Summary

Maintainability
A
2 hrs
Test Coverage
require 'frisky/command'
require 'frisky'

module Frisky
  module Commands
    # The EventScheduler pulls events from github's API and schedules
    # push events to be PushProcessor queue for processing.
    class EventScheduler < Frisky::Command
      attr_reader :classifiers

      def initialize(*args)
        super(*args)

        @classifiers = {}
      end

      def perform(pages_to_fetch=1)
        pushed_jobs = 0

        # Get 10 pages of of events
        pages_to_fetch.times do |page_number|
          Octokit.get(@options[:url], per_page: 30, page: page_number+1).each do |event_raw|
            next if Frisky::Model::Event.exists?(event_raw)
            e = Frisky::Model::Event.load_from_raw(event_raw)

            @classifiers.each do |name, event_types|
              begin
                # Does this classifier support this event?
                next unless event_types.include? e.type

                Resque.push(name, 'class' => name, args: e.serialize.to_s)
                pushed_jobs += 1
              rescue StandardError => e
                Frisky.log.warn "[#{e.class}] #{e.message}"
                e.backtrace[0..9].each {|a| Frisky.log.info a}
              end
            end
          end
        end

        Frisky.log.info "Created #{pushed_jobs} jobs"
      end

      # Removes expired keys and catches the remaining keys
      # TODO -- Check for backed up queues and skip them so huge backlogs are avoided.
      #
      # Classifiers exist on a sorted set in redis, using the time of last ping
      # as the score, and the name of the classifier as the key.
      #
      # This method is called periodically to prune the list of classifiers and cache
      # the available classifiers so each event is pushed to each classifier.
      #
      # Classifiers also push a filter to a set, so they will only receive the type of events
      # they are interested in processing. The set is named classifier:#{name}:config:supported_events, and is also cached
      def fetch_loaded_classifiers
        # find expired classifiers
        time = Time.now.utc.to_i
        expired = Frisky.redis.zrangebyscore("classifiers", "-inf", "(#{time-90}")

        if expired.any?                                                            # Delete expired classifiers
          Frisky.log.warn "Dropping expired classifiers: #{expired.join(', ')}"    # that are not longer running
          Frisky.redis.zrem("classifiers", expired)
        end

        # Get classifiers that match now+-90 to now in order to skip classifiers created too far in the future
        new_classifiers = {}
        Frisky.redis.zrangebyscore("classifiers", "#{time-90}", "#{time+90}").each do |classifier_name|
          new_classifiers[classifier_name] = Frisky.redis.smembers("classifier:#{classifier_name}:config:events")
        end
        Frisky.log.info "Loaded new classifiers: #{(new_classifiers.keys - @classifiers.keys).join(', ')}" if (new_classifiers.keys - @classifiers.keys).any?
        @classifiers.clear
        @classifiers.merge!(new_classifiers)
      end

      def run
        loop do
          fetch_loaded_classifiers

          # Don't perform if we have no classifiers loaded that will do something with the data
          if @classifiers.any?
            perform
          else
            Frisky.log.info "Waiting for classifiers to connect"
          end

          break unless options[:loop]
          sleep @options[:loop].to_i
        end
      end
    end
  end
end