thiagopradi/octopus

View on GitHub
lib/octopus/migration.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'set'
require 'active_support/core_ext/module/aliasing'
require 'active_support/core_ext/array/wrap'

module Octopus
  module Migration
    module InstanceOrClassMethods
      def announce_with_octopus(message)
        announce_without_octopus("#{message} - #{current_shard}")
      end

      def current_shard
        "Shard: #{connection.current_shard}" if connection.respond_to?(:current_shard)
      end
    end

    include InstanceOrClassMethods

    def self.included(base)
      base.extend(ClassMethods)

      base.send :alias_method, :announce_without_octopus, :announce
      base.send :alias_method, :announce, :announce_with_octopus

      base.class_attribute :current_shard, :current_group, :current_group_specified, :instance_reader => false, :instance_writer => false
    end

    module ClassMethods
      def using(*args)
        return self unless connection.is_a?(Octopus::Proxy)

        self.current_shard = args
        self
      end

      def using_group(*groups)
        return self unless connection.is_a?(Octopus::Proxy)

        self.current_group = groups
        self.current_group_specified = true
        self
      end

      def shards
        shards = Set.new

        if (groups = (current_group_specified ? current_group : Octopus.config[:default_migration_group]))
          Array.wrap(groups).each do |group|
            group_shards = connection.shards_for_group(group)
            shards.merge(group_shards) if group_shards
          end
        elsif (shard = current_shard)
          shards.merge(Array.wrap(shard))
        end

        shards.to_a.presence || [Octopus.master_shard]
      end
    end
  end
end

module Octopus
  module Migrator
    def self.included(base)
      unless Octopus.atleast_rails52?
        base.extend(ClassMethods)

        base.class_eval do
          class << self
              alias_method :migrate_without_octopus, :migrate
              alias_method :migrate, :migrate_with_octopus
              
              alias_method :up_without_octopus, :up
              alias_method :up, :up_with_octopus

              alias_method :down_without_octopus, :down
              alias_method :down, :down_with_octopus

              alias_method :run_without_octopus, :run
              alias_method :run, :run_with_octopus

              alias_method :rollback_without_octopus, :rollback
              alias_method :rollback, :rollback_with_octopus
          end
        end
      end

      base.send :alias_method, :run_without_octopus, :run
      base.send :alias_method, :run, :run_with_octopus

      base.send :alias_method, :migrate_without_octopus, :migrate
      base.send :alias_method, :migrate, :migrate_with_octopus

      base.send :alias_method, :migrations_without_octopus, :migrations
      base.send :alias_method, :migrations, :migrations_with_octopus
    end
    if Octopus.atleast_rails52?
      ### Post RAILS 5.2 Migration methods
      
      def run_with_octopus(&block)
        return run_without_octopus(&block) unless connection.is_a?(Octopus::Proxy)
        shards = migrations.map(&:shards).flatten.map(&:to_s)
        connection.send_queries_to_multiple_shards(shards) do
          run_without_octopus(&block)
        end
        rescue ActiveRecord::UnknownMigrationVersionError => e
          raise unless migrations(true).detect { |m| m.version == e.version }
      end

      def migrate_with_octopus(&block)
        return migrate_without_octopus(&block) unless connection.is_a?(Octopus::Proxy)
        shards = migrations.map(&:shards).flatten.map(&:to_s)
        connection.send_queries_to_multiple_shards(shards) do
          migrate_without_octopus(&block)
        end
        rescue ActiveRecord::UnknownMigrationVersionError => e
          raise unless migrations(true).detect { |m| m.version == e.version }
      end

      def migrations_with_octopus(shard_agnostic = true)
        migrations = migrations_without_octopus
        return migrations if !connection.is_a?(Octopus::Proxy) || shard_agnostic

        migrations.select { |m| m.shards.include?(connection.current_shard.to_sym) }
      end
      
      private 
      
      def connection
        ActiveRecord::Base.connection
      end
      
    else 
      ### Pre RAILS 5.2 Migration methods
      
      def run_with_octopus(&block)
        run_without_octopus(&block)
      rescue ActiveRecord::UnknownMigrationVersionError => e
        raise unless migrations(true).detect { |m| m.version == e.version }
      end

      def migrate_with_octopus(&block)
        migrate_without_octopus(&block)
      rescue ActiveRecord::UnknownMigrationVersionError => e
        raise unless migrations(true).detect { |m| m.version == e.version }
      end

      def migrations_with_octopus(shard_agnostic = false)
        connection = ActiveRecord::Base.connection
        migrations = migrations_without_octopus
        return migrations if !connection.is_a?(Octopus::Proxy) || shard_agnostic

        migrations.select { |m| m.shards.include?(connection.current_shard.to_sym) }
      end

      module ClassMethods
        def migrate_with_octopus(migrations_paths, target_version = nil, &block)
          return migrate_without_octopus(migrations_paths, target_version, &block) unless connection.is_a?(Octopus::Proxy)

          connection.send_queries_to_multiple_shards(connection.shard_names) do
            migrate_without_octopus(migrations_paths, target_version, &block)
          end
        end

        def up_with_octopus(migrations_paths, target_version = nil, &block)
          return up_without_octopus(migrations_paths, target_version, &block) unless connection.is_a?(Octopus::Proxy)
          return up_without_octopus(migrations_paths, target_version, &block) unless connection.current_shard.to_s == Octopus.master_shard.to_s

          connection.send_queries_to_multiple_shards(connection.shard_names) do
            up_without_octopus(migrations_paths, target_version, &block)
          end
        end

        def down_with_octopus(migrations_paths, target_version = nil, &block)
          return down_without_octopus(migrations_paths, target_version, &block) unless connection.is_a?(Octopus::Proxy)
          return down_without_octopus(migrations_paths, target_version, &block) unless connection.current_shard.to_s == Octopus.master_shard.to_s

          connection.send_queries_to_multiple_shards(connection.shard_names) do
            down_without_octopus(migrations_paths, target_version, &block)
          end
        end

        def run_with_octopus(direction, migrations_paths, target_version)
          return run_without_octopus(direction, migrations_paths, target_version) unless connection.is_a?(Octopus::Proxy)

          connection.send_queries_to_multiple_shards(connection.shard_names) do
            run_without_octopus(direction, migrations_paths, target_version)
          end
        end

        def rollback_with_octopus(migrations_paths, steps = 1)
          return rollback_without_octopus(migrations_paths, steps) unless connection.is_a?(Octopus::Proxy)

          connection.send_queries_to_multiple_shards(connection.shard_names) do
            rollback_without_octopus(migrations_paths, steps)
          end
        end

        private

        def connection
          ActiveRecord::Base.connection
        end
      end
    end
  end
end

module Octopus
  module MigrationProxy
    def shards
      migration.class.shards
    end
  end
end

module Octopus
  module UnknownMigrationVersionError
    def self.included(base)
      base.send :alias_method, :initialize_without_octopus, :initialize
      base.send :alias_method, :initialize, :initialize_with_octopus

      base.send(:attr_accessor, :version)
    end

    def initialize_with_octopus(version)
      @version = version
      initialize_without_octopus(version)
    end
  end
end

ActiveRecord::Migration.send(:include, Octopus::Migration)
ActiveRecord::Migrator.send(:include, Octopus::Migrator)
ActiveRecord::MigrationProxy.send(:include, Octopus::MigrationProxy)
ActiveRecord::UnknownMigrationVersionError.send(:include, Octopus::UnknownMigrationVersionError)