hlascelles/que-scheduler

View on GitHub
lib/que/scheduler/migrations.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

module Que
  module Scheduler
    module Migrations
      AUDIT_TABLE_NAME = Que::Scheduler::Audit::TABLE_NAME
      TABLE_COMMENT = %(
        SELECT description FROM pg_class
        LEFT JOIN pg_description ON pg_description.objoid = pg_class.oid
        WHERE relname = '#{AUDIT_TABLE_NAME}'
      ).freeze
      MAX_VERSION = Dir.glob("#{__dir__}/migrations/*").map { |d| File.basename(d) }.map(&:to_i).max

      class << self
        def migrate!(version:)
          # Like que, Do not migrate test DBs.
          return if defined?(Que::Testing)

          Que::Scheduler::Db.transaction do
            current = db_version
            if current < version
              migrate_up(current, version)
            elsif current > version
              migrate_down(current, version)
            end
          end
        end

        def db_version
          if audit_table_exists?
            return Que::Scheduler::VersionSupport.execute(TABLE_COMMENT).first[:description].to_i
          end

          Que::Scheduler::Db.count_schedulers.zero? ? 0 : 1
        end

        def audit_table_exists?
          result = Que::Scheduler::VersionSupport.execute(<<-SQL)
            SELECT * FROM information_schema.tables WHERE table_name = '#{AUDIT_TABLE_NAME}';
          SQL
          result.any?
        end

        # This method is only intended for use in squashed migrations
        def reenqueue_scheduler_if_missing
          return unless Que::Scheduler::Db.count_schedulers.zero?

          Que::Scheduler::VersionSupport.enqueue_a_job(Que::Scheduler::SchedulerJob)
        end

        private def migrate_up(current, version)
          if current.zero? # Version 1 does not use SQL
            Que::Scheduler::VersionSupport.enqueue_a_job(Que::Scheduler::SchedulerJob)
          end
          execute_step((current += 1), :up) until current == version
        end

        private def migrate_down(current, version)
          current += 1
          execute_step((current -= 1), :down) until current == version + 1
        end

        private def execute_step(number, direction)
          sql = File.read("#{__dir__}/migrations/#{number}/#{direction}.sql")
          Que::Scheduler::VersionSupport.execute(sql)
          return unless audit_table_exists?

          Que::Scheduler::VersionSupport.execute(
            "COMMENT ON TABLE que_scheduler_audit IS '#{direction == :up ? number : number - 1}'"
          )
        end
      end
    end
  end
end