lib/mixed_gauge/model.rb
require 'active_support/concern'
module MixedGauge
# @example
# class User < ActiveRecord::Base
# include MixedGauge::Model
# use_cluster :user
# def_distkey :email
# replicates_with slave: :UserReadonly, backgroud: :UserBackground
# end
#
# User.put!(email: 'alice@example.com', name: 'alice')
#
# alice = User.get('alice@example.com')
# alice.age = 1
# alice.save!
#
# User.all_shards.flat_map {|m| m.where(name: 'alice') }.compact
module Model
extend ActiveSupport::Concern
included do
class_attribute :cluster_routing, instance_writer: false
class_attribute :shard_repository, instance_writer: false
class_attribute :distkey, instance_writer: false
class_attribute :replication_mapping, instance_writer: false
class_attribute :service, instance_writer: false
end
# ClassMethods
module ClassMethods
# The cluster config must be defined before `use_cluster`.
# @param [Symbol] name A cluster name which is set by MixedGauge.configure
def use_cluster(name, thread_pool_size_base: 3)
config = MixedGauge.config.fetch_cluster_config(name)
self.cluster_routing = MixedGauge::Routing.new(config)
self.shard_repository = MixedGauge::ShardRepository.new(config, self)
thread_size = (shard_repository.all.size * thread_pool_size_base)
self.service = Expeditor::Service.new(
executor: Concurrent::ThreadPoolExecutor.new(
min_threads: thread_size,
max_threads: thread_size,
max_queue: shard_repository.all.size,
fallback_policy: :abort
)
)
self.abstract_class = true
end
# Distkey is a column. mixed_gauge hashes that value and determine which
# shard to store.
# @param [Symbol] column
def def_distkey(column)
self.distkey = column.to_sym
end
# @param [Hash{Symbol => Symbol}] mapping A pairs of role name and
# AR model class name.
def replicates_with(mapping)
self.replication_mapping = MixedGauge::ReplicationMapping.new(mapping)
end
# Create new record with given attributes in proper shard for given key.
# When distkey value is empty, raises MixedGauge::MissingDistkeyAttribute
# error.
# @param [Hash] attributes
# @return [ActiveRecord::Base] A shard model instance
# @raise [MixedGauge::MissingDistkeyAttribute]
def put!(attributes)
raise '`distkey` is not defined. Use `def_distkey`.' unless distkey
@before_put_callback.call(attributes) if @before_put_callback
if (key = attributes[distkey]) || attributes[distkey.to_s]
shard_for(key).create!(attributes)
else
raise MixedGauge::MissingDistkeyAttribute
end
end
# Returns nil when not found. Except that, is same as `.get!`.
# @param [String] key
# @return [ActiveRecord::Base, nil] A shard model instance
def get(key)
raise 'key must be a String' unless key.is_a?(String)
shard_for(key.to_s).find_by(distkey => key)
end
# `.get!` raises MixedGauge::RecordNotFound which is child class of
# `ActiveRecord::RecordNotFound` so you can rescue that exception as same
# as AR's RecordNotFound.
# @param [String] key
# @return [ActiveRecord::Base] A shard model instance
# @raise [MixedGauge::RecordNotFound]
def get!(key)
get(key) || raise(MixedGauge::RecordNotFound)
end
# Register hook to assign auto-generated distkey or something.
# Sometimes you want to generates distkey value before validation. Since
# mixed_gauge generates sub class of your models, AR's callback is not
# usesless for this usecase, so mixed_gauge offers its own callback method.
# @example
# class User
# include MixedGauge::Model
# use_cluster :user
# def_distkey :name
# before_put do |attributes|
# attributes[:name] = generate_name unless attributes[:name]
# end
# end
def before_put(&block)
@before_put_callback = block
end
# Returns a generated model class of included model class which has proper
# connection config for the shard for given key.
# @param [String] key A value of distkey
# @return [Class] A generated model class for given distkey value
def shard_for(key)
connection_name = cluster_routing.route(key.to_s)
shard_repository.fetch(connection_name)
end
# Returns all generated shard model class. Useful to query to all shards.
# @return [Array<Class>] An array of shard models
# @example
# User.all_shards.flat_map {|m| m.find_by(name: 'alice') }.compact
def all_shards
shard_repository.all
end
# @return [Mixedgauge::AllShardsInParallel]
# @example
# User.all_shards_in_parallel.map {|m| m.where.find_by(name: 'Alice') }.compact
def all_shards_in_parallel
AllShardsInParallel.new(all_shards, service: service)
end
alias parallel all_shards_in_parallel
# See example definitions in `spec/models.rb`.
# @param [Symbol] A role name of target cluster.
# @return [Class, Object] if block given then yielded result else
# target shard model.
# @example
# UserReadonly.all_shards.each do |m|
# target_ids = m.where(age: 1).pluck(:id)
# m.switch(:master) do |master|
# master.where(id: target_ids).delete_all
# end
# end
def switch(role_name, &block)
replication_mapping.switch(self, role_name, &block)
end
# Define utility methods which uses all shards or specific shard.
# These methods can be called from included model class.
# @example
# class User
# include MixedGauge::Model
# use_cluster :user
# def_distkey :name
# parent_methods do
# def all_count
# parallel.map {|m| m.count }.reduce(&:+)
# end
#
# def find_from_all_by(condition)
# parallel.flat_map {|m m.find_by(condition) }.compact.first
# end
# end
# end
#
# User.put!(email: 'a@m.com', name: 'a')
# User.put!(email: 'b@m.com', name: 'b')
# User.all_count #=> 2
# User.find_from_all_by(name: 'b') #=> User b
def parent_methods(&block)
instance_eval(&block)
end
end
end
end