glitch-soc/mastodon

View on GitHub
app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb

Summary

Maintainability
B
4 hrs
Test Coverage
# frozen_string_literal: true

class Scheduler::AccountsStatusesCleanupScheduler
  include Sidekiq::Worker
  include Redisable

  # This limit is mostly to be nice to the fediverse at large and not
  # generate too much traffic.
  # This also helps limiting the running time of the scheduler itself.
  MAX_BUDGET         = 300

  # This is an attempt to spread the load across remote servers, as
  # spreading deletions across diverse accounts is likely to spread
  # the deletion across diverse followers. It also helps each individual
  # user see some effect sooner.
  PER_ACCOUNT_BUDGET = 5

  # This is an attempt to limit the workload generated by status removal
  # jobs to something the particular server can handle.
  PER_THREAD_BUDGET  = 5

  # These are latency limits on various queues above which a server is
  # considered to be under load, causing the auto-deletion to be entirely
  # skipped for that run.
  LOAD_LATENCY_THRESHOLDS = {
    default: 5,
    push: 10,
    # The `pull` queue has lower priority jobs, and it's unlikely that
    # pushing deletes would cause much issues with this queue if it didn't
    # cause issues with `default` and `push`. Yet, do not enqueue deletes
    # if the instance is lagging behind too much.
    pull: 5.minutes.to_i,
  }.freeze

  sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i

  def perform
    return if under_load?

    budget = compute_budget

    # If the budget allows it, we want to consider all accounts with enabled
    # auto cleanup at least once.
    #
    # We start from `first_policy_id` (the last processed id in the previous
    # run) and process each policy until we loop to `first_policy_id`,
    # recording into `affected_policies` any policy that caused posts to be
    # deleted.
    #
    # After that, we set `full_iteration` to `false` and continue looping on
    # policies from `affected_policies`.
    first_policy_id   = last_processed_id || 0
    first_iteration   = true
    full_iteration    = true
    affected_policies = []

    loop do
      num_processed_accounts = 0

      scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
      scope.find_each(order: :asc) do |policy|
        num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
        budget -= num_deleted

        unless num_deleted.zero?
          num_processed_accounts += 1
          affected_policies << policy.id if full_iteration
        end

        full_iteration = false if !first_iteration && policy.id >= first_policy_id

        if budget.zero?
          save_last_processed_id(policy.id)
          break
        end
      end

      # The idea here is to loop through all policies at least once until the budget is exhausted
      # and start back after the last processed account otherwise
      break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)

      full_iteration  = false unless first_iteration
      first_iteration = false
    end
  end

  def compute_budget
    # Each post deletion is a `RemovalWorker` job (on `default` queue), each
    # potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue).
    threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum
    [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
  end

  def under_load?
    LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
  end

  private

  def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
    scope = AccountStatusesCleanupPolicy.where(enabled: true)

    if full_iteration
      # If we are doing a full iteration, examine all policies we have not examined yet
      if first_iteration
        scope.where(id: first_policy_id...)
      else
        scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
      end
    else
      # Otherwise, examine only policies that previously yielded posts to delete
      scope.where(id: affected_policies)
    end
  end

  def queue_under_load?(name, max_latency)
    Sidekiq::Queue.new(name).latency > max_latency
  end

  def last_processed_id
    redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
  end

  def save_last_processed_id(id)
    if id.nil?
      redis.del('account_statuses_cleanup_scheduler:last_policy_id')
    else
      redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds)
    end
  end
end