amatriain/feedbunch

View on GitHub
FeedBunch-app/lib/schedule_manager.rb

Summary

Maintainability
B
5 hrs
Test Coverage
# frozen_string_literal: true

##
# Class with methods related to managing update schedules.

class ScheduleManager

  ##
  # For each available feed in the database, ensure that the next update of the feed is scheduled
  #
  # If a feed is found with no scheduled update, one is added.
  #
  # After invoking this method all available feeds are guaranteed to have their next update scheduled.

  def self.fix_scheduled_updates
    Rails.logger.debug 'Fixing scheduled feed updates'

    queue = Sidekiq::Queue.new 'update_feeds'
    queued_ids = queue.select{|job| job.klass == 'ScheduledUpdateFeedWorker'}.map{|job| job.args[0]}

    scheduled_set = Sidekiq::ScheduledSet.new
    scheduled_ids = scheduled_set.select{|job| job.klass == 'ScheduledUpdateFeedWorker'}.map{|job| job.args[0]}

    retrySet = Sidekiq::RetrySet.new
    retry_ids = retrySet.select{|job| job.klass == 'ScheduledUpdateFeedWorker'}.map{|job| job.args[0]}

    workers = Sidekiq::Workers.new
    worker_ids = workers.select{|process_id, thread_id, work| work['payload']['class'] == 'ScheduledUpdateFeedWorker'}.map{|process_id, thread_id, work| work['payload']['args'][0]}

    feeds_unscheduled = []

    Feed.where(available: true).find_each do |feed|
      # count how many update workers there are for each feed in Sidekiq
      schedule_count = feed_schedule_count feed.id, queued_ids, scheduled_ids, retry_ids, worker_ids
      Rails.logger.debug "Update schedule for feed #{feed.id}  #{feed.title} present #{schedule_count} times"

      # if a feed has no update schedule, add it to the array of feeds to be fixed
      if schedule_count == 0
        Rails.logger.warn "Missing schedule for feed #{feed.id} - #{feed.title}"
        feeds_unscheduled << feed
      elsif schedule_count > 1
        # there should be one scheduled update for each feed.
        # If a feed has more than one scheduled update, remove all updates for the feed and add it to the array of feeds to be fixed
        Rails.logger.warn "Feed #{feed.id} - #{feed.title} is scheduled more than one time, removing all scheduled updates to re-add just one"
        unschedule_feed_updates feed.id
        feeds_unscheduled << feed
      end
    end

    if feeds_unscheduled.length > 0
      Rails.logger.warn "A total of #{feeds_unscheduled.length} feeds are missing their update schedules. Adding missing schedules."
      feeds_unscheduled.each do |feed|
        add_missing_schedule feed
      end
    end
  end

  ##
  # Schedule the first update of a feed.
  # Receives as argument the id of the feed for which its first update will be scheduled.
  #
  # The update is scheduled to run in a random amount of minutes, between 0 and 60, after this
  # method is invoked. This is done so that feed updates are more or less
  # evenly, or at least randomly, spaced in time. This way the server load from the updates is spaced over
  # time, to affect user experience as little as possible.
  #
  # After each update, the worker schedules the next update of the feed. The worker tries to adapt
  # the scheduling to the rate at which new entries appear in the feed.

  def self.schedule_first_update(feed_id)
    delay = Random.rand 61
    Rails.logger.info "Scheduling updates of feed #{feed_id} every hour, starting #{delay} minutes from now at #{Time.zone.now + delay.minutes}"
    set_scheduled_update feed_id, delay.minutes
  end

  ##
  # Unschedule (this is, remove from scheduling) future updates of the passed feed.
  # Receives as argument the id of the feed; scheduled updates for other feeds are unaffected.
  #
  # Normally when this method is invoked the feed is also have to be marked as unavailable.
  # After invoking this method, if the feed is marked as unavailable, it won't be updated again.
  # However if it's marked as available the next time FixSchedulesWorker runs (normally daily),
  # periodic updates will start running again. Because of this, if we really want a feed to
  # stop updating it's not enough to invoke this method, the "available" flag of the feed
  # must be set to false as well.

  def self.unschedule_feed_updates(feed_id)
    Rails.logger.info "Unscheduling updates of feed #{feed_id}"

    queue = Sidekiq::Queue.new 'update_feeds'
    queue_jobs = queue.select {|job| job.klass == 'ScheduledUpdateFeedWorker' && job.args[0] == feed_id}
    Rails.logger.info "Feed #{feed_id} update found in 'update_feeds' queue #{queue_jobs.size} times, deleting" if queue_jobs.size > 0
    queue_jobs.each {|job| job.delete}

    scheduled_set = Sidekiq::ScheduledSet.new
    scheduled_job = scheduled_set.select {|job| job.klass == 'ScheduledUpdateFeedWorker' && job.args[0] == feed_id}
    Rails.logger.info "Feed #{feed_id} update scheduled #{scheduled_job.size} times, deleting" if scheduled_job.size > 0
    scheduled_job.each {|job| job.delete}

    retrying = Sidekiq::RetrySet.new
    retrying_job = retrying.select {|job| job.klass == 'ScheduledUpdateFeedWorker' && job.args[0] == feed_id}
    Rails.logger.info "Feed #{feed_id} update marked for retrying #{retrying_job.size} times, deleting" if retrying_job.size > 0
    retrying_job.each {|job| job.delete}
  end

  ##
  # Decrement the interval between updates of the passed feed.
  # The current interval is decremented by 10% up to the minimum set in
  # the application configuration.

  def self.decrement_update_interval(feed)
    new_interval = (feed.fetch_interval_secs * 0.9).round
    min = Feedbunch::Application.config.min_update_interval
    new_interval = min if new_interval < min

    # Add up to +/- 1 minute to the update interval, to add some entropy and distribute updates more evenly over time.
    entropy = schedule_entropy
    new_interval += entropy.seconds

    # Decrement the update interval saved in the database
    Rails.logger.debug "Decrementing update interval of feed #{feed.id} - #{feed.title} to #{new_interval} seconds"
    feed.update fetch_interval_secs: new_interval

    # Actually decrement the update interval
    set_scheduled_update feed.id, feed.fetch_interval_secs
  end

  ##
  # Increment the interval between updates of the passed feed.
  # The current interval is incremented by 10% up to the maximum set in
  # the application configuration.

  def self.increment_update_interval(feed)
    new_interval = (feed.fetch_interval_secs * 1.1).round
    max = Feedbunch::Application.config.max_update_interval
    new_interval = max if new_interval > max

    # Add up to +/- 1 minute to the update interval, to add some entropy and distribute updates more evenly over time.
    entropy = schedule_entropy
    new_interval += entropy.seconds

    # Increment the update interval saved in the database
    Rails.logger.debug "Incrementing update interval of feed #{feed.id} - #{feed.title} to #{new_interval} seconds"
    feed.update fetch_interval_secs: new_interval

    # Actually increment the update interval
    set_scheduled_update feed.id, feed.fetch_interval_secs
  end

  #############################
  # PRIVATE CLASS METHODS
  #############################

  ##
  # Return a random number between -60 and 60.
  # It can be used (as seconds) to add some entropy to schedules, so that scheduled updates are more evenly distributed.

  def self.schedule_entropy
    return Random.rand(121) - 60
  end
  private_class_method :schedule_entropy

  ##
  # Set a scheduled update for a feed.
  #
  # Receives as arguments:
  # - ID of the feed that will be updated
  # - in_seconds: number of seconds until the update runs

  def self.set_scheduled_update(feed_id, in_seconds)
    Rails.logger.info "Setting scheduled update of feed #{feed_id} in #{in_seconds} seconds"
    ScheduledUpdateFeedWorker.perform_in in_seconds.seconds, feed_id
  end
  private_class_method :set_scheduled_update

  ##
  # Add a missing update schedule for a feed. Receives the feed as argument.
  # The next update will be scheduled to run at fetch_interval_secs seconds after last_fetched, as if the schedule
  # had not disappeared, unless that time has already passed, in which case it will be scheduled immediately.

  def self.add_missing_schedule(feed)
    Rails.logger.warn "Adding missing update schedule for feed #{feed.id} - #{feed.title}"

    if feed.last_fetched.blank?
      # If feed has never been fetched, schedule its first update sometime in the next hour
      delay = Random.rand 61
      perform_at = Time.zone.now + delay.minutes
    else
      # Calculate when should the update happen, based on the last time the feed was updated
      # and its current feed interval.
      perform_at = feed.last_fetched + feed.fetch_interval_secs.seconds
    end

    # Schedule update. If the scheduled time is in the past (i.e. the feed should have been updated
    # in the past but it wasn't for some reason), Sidekiq will enqueue the job immediately.
    ScheduledUpdateFeedWorker.perform_at perform_at, feed.id
  end
  private_class_method :add_missing_schedule

  ##
  # Count how many scheduled updates are set for the passed feed.
  #
  # Receives as arguments:
  # - feed id
  # - array of IDs of feeds in the "update_feeds" queue
  # - array of IDs of feeds with a scheduled update
  # - array of IDs of feeds with updates marked for retrying
  # - array of IDs of feeds with updates currently running in a worker thread
  #
  # To count the number of feed updates that have been scheduled, the following Sidekiq queues passed as arguments are checked:
  # - The named "update_feeds" queue. The worker will be found there when its scheduled run time comes, until
  # a Sidekiq thread is free to process it.
  # - The queue with jobs scheduled to run in the future
  # - The queue of jobs that have failed (an error has raised during processing) and are scheduled to be retried in
  # the future
  # - The currently running jobs
  #
  # The number of times ScheduledFeedUpdateWorker is found in any of these queues with the id of the passed feed as argument
  # is counted and returned.

  def self.feed_schedule_count(feed_id, queued_ids, scheduled_ids, retry_ids, worker_ids)
    queued_count = update_queued_count feed_id, queued_ids
    scheduled_count = update_scheduled_count feed_id, scheduled_ids
    retrying_count = update_retrying_count feed_id, retry_ids
    running_count = update_running_count feed_id, worker_ids

    updates_count = queued_count + scheduled_count + retrying_count + running_count

    if updates_count > 0
      Rails.logger.info "Feed #{feed_id} update worker is present"
    else
      Rails.logger.info "Feed #{feed_id} update worker is not present"
    end

    return updates_count
  end
  private_class_method :feed_schedule_count

  ##
  # Count how many scheduled updates for the passed feed are already in the 'update_feeds' queue waiting
  # for a free Sidekiq thread to be processed.
  #
  # Receives as arguments:
  # - feed id
  # - array of IDs of feeds in the "update_feeds" queue
  #
  # Returns the count of scheduled updates, zero if none.

  def self.update_queued_count(feed_id, queued_ids)
    queued_count = queued_ids.count feed_id

    if queued_count > 0
      Rails.logger.info "Feed #{feed_id} update worker queued for immediate processing"
    else
      Rails.logger.info "Feed #{feed_id} update worker not queued for immediate processing"
    end

    return queued_count
  end
  private_class_method :update_queued_count

  ##
  # Count how many updates for the passed feed are scheduled.
  #
  # Receives as arguments:
  # - feed id
  # - array of IDs of feeds with a scheduled update
  #
  # Returns the count of scheduled updates for the passed feed, zero if none.

  def self.update_scheduled_count(feed_id, scheduled_ids)
    scheduled_count = scheduled_ids.count feed_id

    if scheduled_count > 0
      Rails.logger.info "Feed #{feed_id} update worker scheduled"
    else
      Rails.logger.info "Feed #{feed_id} update worker not scheduled"
    end

    return scheduled_count
  end
  private_class_method :update_scheduled_count

  ##
  # Count how many updates for the passed feed are scheduled for retrying.
  #
  # Receives as arguments:
  # - feed id
  # - array of IDs of feeds with updates marked for retrying
  #
  # Returns the count of updates scheduled for retrying, zero if none.

  def self.update_retrying_count(feed_id, retry_ids)
    retrying_count = retry_ids.count feed_id

    if retrying_count > 0
      Rails.logger.info "Feed #{feed_id} update worker scheduled for retrying"
    else
      Rails.logger.info "Feed #{feed_id} update worker not scheduled for retrying"
    end

    return retrying_count
  end
  private_class_method :update_retrying_count

  ##
  # Count how many updates for the passed feed are currently being processed.
  #
  # Receives as arguments:
  # - feed id
  # - array of IDs of feeds with updates currently running in a worker thread
  #
  # Returns the count of updates currently running, zero if no update for the passed feed is running.

  def self.update_running_count(feed_id, worker_ids)
    running_count = worker_ids.count feed_id

    if running_count > 0
      Rails.logger.info "Feed #{feed_id} update worker currently running"
    else
      Rails.logger.info "Feed #{feed_id} update worker currently not running"
    end

    return running_count
  end
  private_class_method :update_running_count
end