lib/octopus/proxy.rb
require 'set'
require 'octopus/slave_group'
require 'octopus/load_balancing/round_robin'
module Octopus
class Proxy
attr_accessor :proxy_config
delegate :current_model, :current_model=,
:current_shard, :current_shard=,
:current_group, :current_group=,
:current_slave_group, :current_slave_group=,
:current_load_balance_options, :current_load_balance_options=,
:block, :block=, :fully_replicated?, :has_group?,
:shard_names, :shards_for_group, :shards, :sharded, :slaves_list,
:shards_slave_groups, :slave_groups, :replicated, :slaves_load_balancer,
:config, :initialize_shards, :shard_name, to: :proxy_config, prefix: false
def initialize(config = Octopus.config)
self.proxy_config = Octopus::ProxyConfig.new(config)
end
# Rails Connection Methods - Those methods are overriden to add custom behavior that helps
# Octopus introduce Sharding / Replication.
delegate :adapter_name, :add_transaction_record, :case_sensitive_modifier,
:type_cast, :to_sql, :quote, :quote_column_name, :quote_table_name,
:quote_table_name_for_assignment, :supports_migrations?, :table_alias_for,
:table_exists?, :in_clause_length, :supports_ddl_transactions?,
:sanitize_limit, :prefetch_primary_key?, :current_database,
:combine_bind_parameters, :empty_insert_statement_value, :assume_migrated_upto_version,
:schema_cache, :substitute_at, :internal_string_options_for_primary_key, :lookup_cast_type_from_column,
:supports_advisory_locks?, :get_advisory_lock, :initialize_internal_metadata_table,
:release_advisory_lock, :prepare_binds_for_database, :cacheable_query, :column_name_for_operation,
:prepared_statements, :transaction_state, :create_table, to: :select_connection
def execute(sql, name = nil)
conn = select_connection
clean_connection_proxy if should_clean_connection_proxy?('execute')
conn.execute(sql, name)
end
def insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = [])
conn = select_connection
clean_connection_proxy if should_clean_connection_proxy?('insert')
conn.insert(arel, name, pk, id_value, sequence_name, binds)
end
def update(arel, name = nil, binds = [])
conn = select_connection
# Call the legacy should_clean_connection_proxy? method here, emulating an insert.
clean_connection_proxy if should_clean_connection_proxy?('insert')
conn.update(arel, name, binds)
end
def delete(*args, &block)
legacy_method_missing_logic('delete', *args, &block)
end
def select_all(*args, &block)
legacy_method_missing_logic('select_all', *args, &block)
end
def select_value(*args, &block)
legacy_method_missing_logic('select_value', *args, &block)
end
# Rails 3.1 sets automatic_reconnect to false when it removes
# connection pool. Octopus can potentially retain a reference to a closed
# connection pool. Previously, that would work since the pool would just
# reconnect, but in Rails 3.1 the flag prevents this.
def safe_connection(connection_pool)
connection_pool.automatic_reconnect ||= true
if !connection_pool.connected? && shards[Octopus.master_shard].connection.query_cache_enabled
connection_pool.connection.enable_query_cache!
end
connection_pool.connection
end
def select_connection
safe_connection(shards[shard_name])
end
def run_queries_on_shard(shard, &_block)
keeping_connection_proxy(shard) do
using_shard(shard) do
yield
end
end
end
def send_queries_to_multiple_shards(shards, &block)
shards.map do |shard|
run_queries_on_shard(shard, &block)
end
end
def send_queries_to_group(group, &block)
using_group(group) do
send_queries_to_multiple_shards(shards_for_group(group), &block)
end
end
def send_queries_to_all_shards(&block)
send_queries_to_multiple_shards(shard_names.uniq { |shard_name| shards[shard_name] }, &block)
end
def clean_connection_proxy
self.current_shard = Octopus.master_shard
self.current_model = nil
self.current_group = nil
self.block = nil
end
def check_schema_migrations(shard)
OctopusModel.using(shard).connection.table_exists?(
ActiveRecord::Migrator.schema_migrations_table_name,
) || OctopusModel.using(shard).connection.initialize_schema_migrations_table
end
def transaction(options = {}, &block)
if !sharded && current_model_replicated?
run_queries_on_shard(Octopus.master_shard) do
select_connection.transaction(options, &block)
end
else
select_connection.transaction(options, &block)
end
end
def method_missing(method, *args, &block)
legacy_method_missing_logic(method, *args, &block)
end
def respond_to?(method, include_private = false)
super || select_connection.respond_to?(method, include_private)
end
def connection_pool
shards[current_shard]
end
if Octopus.rails4?
def enable_query_cache!
clear_query_cache
with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! }
end
def disable_query_cache!
with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! }
end
end
def clear_query_cache
with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache }
end
def clear_active_connections!
with_each_healthy_shard(&:release_connection)
end
def clear_all_connections!
with_each_healthy_shard(&:disconnect!)
if Octopus.atleast_rails52?
# On Rails 5.2 it is no longer safe to re-use connection pools after they have been discarded
# This happens on webservers with forking, for example Phusion Passenger.
# Therefor after we clear all connections we reinitialize the shards to get fresh and not discarded ConnectionPool objects
proxy_config.reinitialize_shards
end
end
def connected?
shards.any? { |_k, v| v.connected? }
end
def should_send_queries_to_shard_slave_group?(method)
should_use_slaves_for_method?(method) && shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present?
end
def send_queries_to_shard_slave_group(method, *args, &block)
send_queries_to_balancer(shards_slave_groups[current_shard][current_slave_group], method, *args, &block)
end
def should_send_queries_to_slave_group?(method)
should_use_slaves_for_method?(method) && slave_groups.try(:[], current_slave_group).present?
end
def send_queries_to_slave_group(method, *args, &block)
send_queries_to_balancer(slave_groups[current_slave_group], method, *args, &block)
end
def current_model_replicated?
replicated && (current_model.try(:replicated) || fully_replicated?)
end
def initialize_schema_migrations_table
if Octopus.atleast_rails52?
select_connection.transaction { ActiveRecord::SchemaMigration.create_table }
else
select_connection.initialize_schema_migrations_table
end
end
def initialize_metadata_table
select_connection.transaction { ActiveRecord::InternalMetadata.create_table }
end
protected
# @thiagopradi - This legacy method missing logic will be keep for a while for compatibility
# and will be removed when Octopus 1.0 will be released.
# We are planning to migrate to a much stable logic for the Proxy that doesn't require method missing.
def legacy_method_missing_logic(method, *args, &block)
if should_clean_connection_proxy?(method)
conn = select_connection
clean_connection_proxy
conn.send(method, *args, &block)
elsif should_send_queries_to_shard_slave_group?(method)
send_queries_to_shard_slave_group(method, *args, &block)
elsif should_send_queries_to_slave_group?(method)
send_queries_to_slave_group(method, *args, &block)
elsif should_send_queries_to_replicated_databases?(method)
send_queries_to_selected_slave(method, *args, &block)
else
val = select_connection.send(method, *args, &block)
if val.instance_of? ActiveRecord::Result
val.current_shard = shard_name
end
val
end
end
# Ensure that a single failing slave doesn't take down the entire application
def with_each_healthy_shard
shards.each do |shard_name, v|
begin
yield(v)
rescue => e
if Octopus.robust_environment?
Octopus.logger.error "Error on shard #{shard_name}: #{e.message}"
else
raise
end
end
end
ar_pools = ActiveRecord::Base.connection_handler.connection_pool_list
ar_pools.each do |pool|
next if pool == shards[:master] # Already handled this
begin
yield(pool)
rescue => e
if Octopus.robust_environment?
Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}"
else
raise
end
end
end
end
def should_clean_connection_proxy?(method)
method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard)
end
# Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined
def should_send_queries_to_replicated_databases?(method)
replicated && method.to_s =~ /select/ && !block && !slaves_grouped?
end
def send_queries_to_selected_slave(method, *args, &block)
if current_model.replicated || fully_replicated?
selected_slave = slaves_load_balancer.next current_load_balance_options
else
selected_slave = Octopus.master_shard
end
send_queries_to_slave(selected_slave, method, *args, &block)
end
# We should use slaves if and only if its safe to do so.
#
# We can safely use slaves when:
# (1) `replicated: true` is specified in `shards.yml`
# (2) The current model is `replicated()`, or `fully_replicated: true` is specified in `shards.yml` which means that
# all the model is `replicated()`
# (3) It's a SELECT query
# while ensuring that we revert `current_shard` from the selected slave to the (shard's) master
# not to make queries other than SELECT leak to the slave.
def should_use_slaves_for_method?(method)
current_model_replicated? && method.to_s =~ /select/
end
def slaves_grouped?
slave_groups.present?
end
# Temporarily switch `current_shard` to the next slave in a slave group and send queries to it
# while preserving `current_shard`
def send_queries_to_balancer(balancer, method, *args, &block)
send_queries_to_slave(balancer.next(current_load_balance_options), method, *args, &block)
end
# Temporarily switch `current_shard` to the specified slave and send queries to it
# while preserving `current_shard`
def send_queries_to_slave(slave, method, *args, &block)
using_shard(slave) do
val = select_connection.send(method, *args, &block)
if val.instance_of? ActiveRecord::Result
val.current_shard = slave
end
val
end
end
# Temporarily block cleaning connection proxy and run the block
#
# @see Octopus::Proxy#should_clean_connection?
# @see Octopus::Proxy#clean_connection_proxy
def keeping_connection_proxy(shard, &_block)
last_block = block
begin
self.block = shard
yield
ensure
self.block = last_block || nil
end
end
# Temporarily switch `current_shard` and run the block
def using_shard(shard, &_block)
older_shard = current_shard
older_slave_group = current_slave_group
older_load_balance_options = current_load_balance_options
begin
unless current_model && !current_model.allowed_shard?(shard)
self.current_shard = shard
end
yield
ensure
self.current_shard = older_shard
self.current_slave_group = older_slave_group
self.current_load_balance_options = older_load_balance_options
end
end
# Temporarily switch `current_group` and run the block
def using_group(group, &_block)
older_group = current_group
begin
self.current_group = group
yield
ensure
self.current_group = older_group
end
end
end
end