yuemori/activerecord-shard_for

View on GitHub
lib/activerecord/shard_for/model.rb

Summary

Maintainability
A
25 mins
Test Coverage
A
100%
require 'active_support/concern'

module ActiveRecord
  module ShardFor
    module Model
      extend ActiveSupport::Concern

      included do
        class_attribute :connection_router, instance_writer: false
        class_attribute :shard_repository, instance_writer: false
        class_attribute :replication_mapping, instance_writer: false
        class_attribute :distkey, instance_writer: false
        class_attribute :service, instance_writer: false

        include ActiveRecord::ShardFor::Patch
      end

      module ClassMethods
        # The cluster config must be defined before `use_cluster`
        # @param [Symbol] name A cluster name which is set by ActiveRecord::ShardFor.configure
        def use_cluster(name, router_name, thread_pool_size_base: 3)
          cluster_config = ActiveRecord::ShardFor.config.fetch_cluster_config(name)
          connection_router_class = ActiveRecord::ShardFor.config.fetch_connection_router(router_name)
          self.connection_router = connection_router_class.new(cluster_config)
          self.shard_repository = ActiveRecord::ShardFor::ShardRepository.new(cluster_config, self)
          thread_size = (shard_repository.all.size * thread_pool_size_base)
          self.service = Expeditor::Service.new(
            executor: Concurrent::ThreadPoolExecutor.new(
              min_threads: thread_size,
              max_threads: thread_size,
              max_queue: shard_repository.all.size,
              fallback_policy: :abort
            )
          )
          self.abstract_class = true
        end

        # Returns a generated model class of included model which specific connection.
        # @param [Object] shard_key key of a shard connection
        # @yield [Class] generated model class which key of shard connection
        # @return [Class] generated model class which key of shard connection
        def using(shard_key)
          model = shard_repository.fetch_by_key(shard_key)
          yield model if block_given?
          model
        end

        # Returns a generated model class of included model class which has proper
        # connection config for the shard for given key.
        # @param [String] key A value of distkey
        # @return [Class] A generated model class for given distkey value
        def shard_for(key)
          connection_name = connection_router.fetch_connection_name(key)
          shard_repository.fetch(connection_name)
        end

        # Evaluate block in all shard instances.
        def shard_eval(&block)
          all_shards.each do |shard|
            shard.class_eval(&block)
          end
        end

        # Create new record with given attributes in proper shard for given key.
        # When distkey value is empty, raises ActiveRecord::ShardFor::MissingDistkeyAttribute
        # error.
        # @param [Hash] attributes
        # @return [ActiveRecord::Base] A shard model instance
        # @raise [ActiveRecord::ShardFor::MissingDistkeyAttribute]
        def put!(attributes)
          raise '`distkey` is not defined. Use `def_distkey`.' unless distkey

          @before_put_callback.call(attributes) if defined?(@before_put_callback) && @before_put_callback

          key = fetch_distkey_from_attributes(attributes)

          raise ActiveRecord::ShardFor::MissingDistkeyAttribute unless key

          shard_for(key).create!(attributes)
        end

        # Register hook to assign auto-generated distkey or something.
        # Sometimes you want to generates distkey value before validation. Since
        # activerecord-shard_for generates sub class of your models, AR's callback is not
        # useless for this usecase, so activerecord-shard_for offers its own callback method.
        # @example
        #   class User
        #     include ActiveRecord::ShardFor::Model
        #     use_cluster :user
        #     def_distkey :name
        #     before_put do |attributes|
        #       attributes[:name] = generate_name unless attributes[:name]
        #     end
        #   end
        def before_put(&block)
          @before_put_callback = block
        end

        # Returns nil when not found. Except that, is same as `.get!`.
        # @param [String] key
        # @return [ActiveRecord::Base, nil] A shard model instance
        def get(key)
          shard_for(key).find_by(distkey => key)
        end

        # `.get!` raises ActiveRecord::ShardFor::RecordNotFound which is child class of
        # `ActiveRecord::RecordNotFound` so you can rescue that exception as same
        # as AR's RecordNotFound.
        # @param [String] key
        # @return [ActiveRecord::Base] A shard model instance
        # @raise [ActiveRecord::ShardFor::RecordNotFound]
        def get!(key)
          model = get(key)
          return model if model

          raise ActiveRecord::ShardFor::RecordNotFound
        end

        # Distkey is a column. activerecord-shard_for gave to connection_router that value
        # and connection_router determine which shard to store.
        # @param [Symbol] column
        def def_distkey(column)
          self.distkey = column.to_sym
        end

        # Returns all generated shard model class. Useful to query to all shards.
        # @return [Array<Class>] An array of shard models
        # @example
        #   User.all_shards.flat_map {|m| m.find_by(name: 'alice') }.compact
        def all_shards
          shard_repository.all
        end

        # @return [ActiveRecord::ShardFor::AllShardsInParallel]
        # @example
        #   User.all_shards_in_parallel.map {|m| m.where.find_by(name: 'Alice') }.compact
        def all_shards_in_parallel
          AllShardsInParallel.new(all_shards, service: service)
        end
        alias_method :parallel, :all_shards_in_parallel

        # @param [Hash{Symbol => Symbol}] mapping A pairs of role name and
        #   AR model class name.
        def replicates_with(mapping)
          self.replication_mapping = ActiveRecord::ShardFor::ReplicationMapping.new(mapping)
        end

        # See example definitions in `spec/models.rb`.
        # @param [Symbol] A role name of target cluster.
        # @return [Class, Object] if block given then yielded result else
        #   target shard model.
        # @example
        #   UserReadonly.all_shards.each do |m|
        #     target_ids = m.where(age: 1).pluck(:id)
        #     m.switch(:master) do |master|
        #       master.where(id: target_ids).delete_all
        #     end
        #   end
        def switch(role_name, &block)
          replication_mapping.switch(self, role_name, &block)
        end

        private

        # @param [Hash] attributes
        # @return [Object or nil] distkey
        def fetch_distkey_from_attributes(attributes)
          key = attributes[distkey] || attributes[distkey.to_s]
          return key if key

          instance = all_shards.first.new(attributes)
          return unless instance.respond_to?(distkey)

          instance.send(distkey)
        end
      end
    end
  end
end