thiagopradi/octopus

View on GitHub
lib/octopus/proxy.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'set'
require 'octopus/slave_group'
require 'octopus/load_balancing/round_robin'

module Octopus
  class Proxy
    attr_accessor :proxy_config

    delegate :current_model, :current_model=,
             :current_shard, :current_shard=,
             :current_group, :current_group=,
             :current_slave_group, :current_slave_group=,
             :current_load_balance_options, :current_load_balance_options=,
             :block, :block=, :fully_replicated?, :has_group?,
             :shard_names, :shards_for_group, :shards, :sharded, :slaves_list,
             :shards_slave_groups, :slave_groups, :replicated, :slaves_load_balancer,
             :config, :initialize_shards, :shard_name, to: :proxy_config, prefix: false

    def initialize(config = Octopus.config)
      self.proxy_config = Octopus::ProxyConfig.new(config)
    end

    # Rails Connection Methods - Those methods are overriden to add custom behavior that helps
    # Octopus introduce Sharding / Replication.
    delegate :adapter_name, :add_transaction_record, :case_sensitive_modifier,
      :type_cast, :to_sql, :quote, :quote_column_name, :quote_table_name,
      :quote_table_name_for_assignment, :supports_migrations?, :table_alias_for,
      :table_exists?, :in_clause_length, :supports_ddl_transactions?,
      :sanitize_limit, :prefetch_primary_key?, :current_database,
      :combine_bind_parameters, :empty_insert_statement_value, :assume_migrated_upto_version,
      :schema_cache, :substitute_at, :internal_string_options_for_primary_key, :lookup_cast_type_from_column,
      :supports_advisory_locks?, :get_advisory_lock, :initialize_internal_metadata_table,
      :release_advisory_lock, :prepare_binds_for_database, :cacheable_query, :column_name_for_operation,
      :prepared_statements, :transaction_state, :create_table, to: :select_connection

    def execute(sql, name = nil)
      conn = select_connection
      clean_connection_proxy if should_clean_connection_proxy?('execute')
      conn.execute(sql, name)
    end

    def insert(arel, name = nil, pk = nil, id_value = nil, sequence_name = nil, binds = [])
      conn = select_connection
      clean_connection_proxy if should_clean_connection_proxy?('insert')
      conn.insert(arel, name, pk, id_value, sequence_name, binds)
    end

    def update(arel, name = nil, binds = [])
      conn = select_connection
      # Call the legacy should_clean_connection_proxy? method here, emulating an insert.
      clean_connection_proxy if should_clean_connection_proxy?('insert')
      conn.update(arel, name, binds)
    end

    def delete(*args, &block)
      legacy_method_missing_logic('delete', *args, &block)
    end

    def select_all(*args, &block)
      legacy_method_missing_logic('select_all', *args, &block)
    end

    def select_value(*args, &block)
      legacy_method_missing_logic('select_value', *args, &block)
    end

    # Rails 3.1 sets automatic_reconnect to false when it removes
    # connection pool.  Octopus can potentially retain a reference to a closed
    # connection pool.  Previously, that would work since the pool would just
    # reconnect, but in Rails 3.1 the flag prevents this.
    def safe_connection(connection_pool)
      connection_pool.automatic_reconnect ||= true
      if !connection_pool.connected? && shards[Octopus.master_shard].connection.query_cache_enabled
        connection_pool.connection.enable_query_cache!
      end
      connection_pool.connection
    end

    def select_connection
      safe_connection(shards[shard_name])
    end

    def run_queries_on_shard(shard, &_block)
      keeping_connection_proxy(shard) do
        using_shard(shard) do
          yield
        end
      end
    end

    def send_queries_to_multiple_shards(shards, &block)
      shards.map do |shard|
        run_queries_on_shard(shard, &block)
      end
    end

    def send_queries_to_group(group, &block)
      using_group(group) do
        send_queries_to_multiple_shards(shards_for_group(group), &block)
      end
    end

    def send_queries_to_all_shards(&block)
      send_queries_to_multiple_shards(shard_names.uniq { |shard_name| shards[shard_name] }, &block)
    end

    def clean_connection_proxy
      self.current_shard = Octopus.master_shard
      self.current_model = nil
      self.current_group = nil
      self.block = nil
    end

    def check_schema_migrations(shard)
      OctopusModel.using(shard).connection.table_exists?(
        ActiveRecord::Migrator.schema_migrations_table_name,
      ) || OctopusModel.using(shard).connection.initialize_schema_migrations_table
    end

    def transaction(options = {}, &block)
      if !sharded && current_model_replicated?
        run_queries_on_shard(Octopus.master_shard) do
          select_connection.transaction(options, &block)
        end
      else
        select_connection.transaction(options, &block)
      end
    end

    def method_missing(method, *args, &block)
      legacy_method_missing_logic(method, *args, &block)
    end

    def respond_to?(method, include_private = false)
      super || select_connection.respond_to?(method, include_private)
    end

    def connection_pool
      shards[current_shard]
    end

    if Octopus.rails4?
      def enable_query_cache!
        clear_query_cache
        with_each_healthy_shard { |v| v.connected? && safe_connection(v).enable_query_cache! }
      end

      def disable_query_cache!
        with_each_healthy_shard { |v| v.connected? && safe_connection(v).disable_query_cache! }
      end
    end

    def clear_query_cache
      with_each_healthy_shard { |v| v.connected? && safe_connection(v).clear_query_cache }
    end

    def clear_active_connections!
      with_each_healthy_shard(&:release_connection)
    end

    def clear_all_connections!
      with_each_healthy_shard(&:disconnect!)

      if Octopus.atleast_rails52?
        # On Rails 5.2 it is no longer safe to re-use connection pools after they have been discarded
        # This happens on webservers with forking, for example Phusion Passenger.
        # Therefor after we clear all connections we reinitialize the shards to get fresh and not discarded ConnectionPool objects
        proxy_config.reinitialize_shards
      end
    end

    def connected?
      shards.any? { |_k, v| v.connected? }
    end

    def should_send_queries_to_shard_slave_group?(method)
      should_use_slaves_for_method?(method) && shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present?
    end

    def send_queries_to_shard_slave_group(method, *args, &block)
      send_queries_to_balancer(shards_slave_groups[current_shard][current_slave_group], method, *args, &block)
    end

    def should_send_queries_to_slave_group?(method)
      should_use_slaves_for_method?(method) && slave_groups.try(:[], current_slave_group).present?
    end

    def send_queries_to_slave_group(method, *args, &block)
      send_queries_to_balancer(slave_groups[current_slave_group], method, *args, &block)
    end

    def current_model_replicated?
      replicated && (current_model.try(:replicated) || fully_replicated?)
    end
    
    def initialize_schema_migrations_table
      if Octopus.atleast_rails52?
        select_connection.transaction { ActiveRecord::SchemaMigration.create_table }
      else 
        select_connection.initialize_schema_migrations_table
      end
    end
    
    def initialize_metadata_table
      select_connection.transaction { ActiveRecord::InternalMetadata.create_table }
    end

    protected

    # @thiagopradi - This legacy method missing logic will be keep for a while for compatibility
    # and will be removed when Octopus 1.0 will be released.
    # We are planning to migrate to a much stable logic for the Proxy that doesn't require method missing.
    def legacy_method_missing_logic(method, *args, &block)
      if should_clean_connection_proxy?(method)
        conn = select_connection
        clean_connection_proxy
        conn.send(method, *args, &block)
      elsif should_send_queries_to_shard_slave_group?(method)
        send_queries_to_shard_slave_group(method, *args, &block)
      elsif should_send_queries_to_slave_group?(method)
        send_queries_to_slave_group(method, *args, &block)
      elsif should_send_queries_to_replicated_databases?(method)
        send_queries_to_selected_slave(method, *args, &block)
      else
        val = select_connection.send(method, *args, &block)

        if val.instance_of? ActiveRecord::Result
          val.current_shard = shard_name
        end

        val
      end
    end

    # Ensure that a single failing slave doesn't take down the entire application
    def with_each_healthy_shard
      shards.each do |shard_name, v|
        begin
          yield(v)
        rescue => e
          if Octopus.robust_environment?
            Octopus.logger.error "Error on shard #{shard_name}: #{e.message}"
          else
            raise
          end
        end
      end

      ar_pools = ActiveRecord::Base.connection_handler.connection_pool_list

      ar_pools.each do |pool|
        next if pool == shards[:master] # Already handled this

        begin
          yield(pool)
        rescue => e
          if Octopus.robust_environment?
            Octopus.logger.error "Error on pool (spec: #{pool.spec}): #{e.message}"
          else
            raise
          end
        end
      end
    end

    def should_clean_connection_proxy?(method)
      method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard)
    end

    # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined
    def should_send_queries_to_replicated_databases?(method)
      replicated && method.to_s =~ /select/ && !block && !slaves_grouped?
    end

    def send_queries_to_selected_slave(method, *args, &block)
      if current_model.replicated || fully_replicated?
        selected_slave = slaves_load_balancer.next current_load_balance_options
      else
        selected_slave = Octopus.master_shard
      end

      send_queries_to_slave(selected_slave, method, *args, &block)
    end

    # We should use slaves if and only if its safe to do so.
    #
    # We can safely use slaves when:
    # (1) `replicated: true` is specified in `shards.yml`
    # (2) The current model is `replicated()`, or `fully_replicated: true` is specified in `shards.yml` which means that
    #     all the model is `replicated()`
    # (3) It's a SELECT query
    # while ensuring that we revert `current_shard` from the selected slave to the (shard's) master
    # not to make queries other than SELECT leak to the slave.
    def should_use_slaves_for_method?(method)
      current_model_replicated? && method.to_s =~ /select/
    end

    def slaves_grouped?
      slave_groups.present?
    end

    # Temporarily switch `current_shard` to the next slave in a slave group and send queries to it
    # while preserving `current_shard`
    def send_queries_to_balancer(balancer, method, *args, &block)
      send_queries_to_slave(balancer.next(current_load_balance_options), method, *args, &block)
    end

    # Temporarily switch `current_shard` to the specified slave and send queries to it
    # while preserving `current_shard`
    def send_queries_to_slave(slave, method, *args, &block)
      using_shard(slave) do
        val = select_connection.send(method, *args, &block)
        if val.instance_of? ActiveRecord::Result
          val.current_shard = slave
        end
        val
      end
    end

    # Temporarily block cleaning connection proxy and run the block
    #
    # @see Octopus::Proxy#should_clean_connection?
    # @see Octopus::Proxy#clean_connection_proxy
    def keeping_connection_proxy(shard, &_block)
      last_block = block

      begin
        self.block = shard
        yield
      ensure
        self.block = last_block || nil
      end
    end

    # Temporarily switch `current_shard` and run the block
    def using_shard(shard, &_block)
      older_shard = current_shard
      older_slave_group = current_slave_group
      older_load_balance_options = current_load_balance_options

      begin
        unless current_model && !current_model.allowed_shard?(shard)
          self.current_shard = shard
        end
        yield
      ensure
        self.current_shard = older_shard
        self.current_slave_group = older_slave_group
        self.current_load_balance_options = older_load_balance_options
      end
    end

    # Temporarily switch `current_group` and run the block
    def using_group(group, &_block)
      older_group = current_group

      begin
        self.current_group = group
        yield
      ensure
        self.current_group = older_group
      end
    end
  end
end