rbuck/keymap

View on GitHub
lib/keymap/connection_adapters/abstract/connection_pool.rb

Summary

Maintainability
A
2 hrs
Test Coverage
require 'thread'
require 'monitor'
require 'set'

module Keymap
  # Raised when a connection could not be obtained within the connection
  # acquisition timeout period.
  class ConnectionTimeoutError < ConnectionNotEstablished
  end

  module ConnectionAdapters
    # Connection pool base class for managing connections.
    #
    # == Introduction
    #
    # A connection pool synchronizes thread access to a limited number of
    # connections. The basic idea is that each thread checks out a
    # connection from the pool, uses that connection, and checks the
    # connection back in. ConnectionPool is completely thread-safe, and will
    # ensure that a connection cannot be used by two threads at the same time,
    # as long as ConnectionPool's contract is correctly followed. It will also
    # handle cases in which there are more threads than connections: if all
    # connections have been checked out, and a thread tries to checkout a
    # connection anyway, then ConnectionPool will wait until some other thread
    # has checked in a connection.
    #
    # == Obtaining (checking out) a connection
    #
    # Connections can be obtained and used from a connection pool in several
    # ways:
    #
    # 1. Simply use Keymap::Base.connection. Eventually, when you're done with
    #    the connection(s) and wish it to be returned to the pool, you call
    #    Keymap::Base.clear_active_connections!.
    # 2. Manually check out a connection from the pool with
    #    Keymap::Base.connection_pool.checkout. You are responsible for
    #    returning this connection to the pool when finished by calling
    #    Keymap::Base.connection_pool.checkin(connection).
    # 3. Use Keymap::Base.connection_pool.with_connection(&block), which
    #    obtains a connection, yields it as the sole argument to the block,
    #    and returns it to the pool after the block completes.
    #
    # Connections in the pool are actually AbstractAdapter objects (or objects
    # compatible with AbstractAdapter's interface).
    #
    # == Options
    #
    # There are two connection-pooling-related options that you can add to
    # your database connection configuration:
    #
    # * +pool+: number indicating size of connection pool (default 5)
    # * +checkout _timeout+: number of seconds to block and wait for a 
    #   connection before giving up and raising a timeout error 
    #   (default 5 seconds).
    class ConnectionPool
      include MonitorMixin

      attr_accessor :automatic_reconnect
      attr_reader :spec, :connections

      # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
      # object which describes database connection information (e.g. adapter,
      # host name, username, password, etc), as well as the maximum size for
      # this ConnectionPool.
      #
      # The default ConnectionPool maximum size is 5.
      def initialize(spec)
        super()

        @spec = spec

        # The cache of reserved connections mapped to threads
        @reserved_connections = {}

        @queue = new_cond
        # 'wait_timeout', the backward-compatible key, conflicts with spec key 
        # used by mysql2 for something entirely different, checkout_timeout
        # preferred to avoid conflict and allow independent values. 
        @timeout = spec.config[:checkout_timeout] || spec.config[:wait_timeout] || 5

        # default max pool size to 5
        @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5

        @connections = []
        @automatic_reconnect = true
      end

      # Retrieve the connection associated with the current thread, or call
      # #checkout to obtain one if necessary.
      #
      # #connection can be called any number of times; the connection is
      # held in a hash keyed by the thread id.
      def connection
        synchronize do
          @reserved_connections[current_connection_id] ||= checkout
        end
      end

      # Is there an open connection that is being used for the current thread?
      def active_connection?
        synchronize do
          @reserved_connections.fetch(current_connection_id) {
            return false
          }.in_use?
        end
      end

      # Signal that the thread is finished with the current connection.
      # #release_connection releases the connection-thread association
      # and returns the connection to the pool.
      def release_connection(with_id = current_connection_id)
        conn = synchronize { @reserved_connections.delete(with_id) }
        checkin conn if conn
      end

      # If a connection already exists yield it to the block. If no connection
      # exists checkout a connection, yield it to the block, and checkin the
      # connection when finished.
      def with_connection
        fresh_connection = false
        connection_id = current_connection_id
        fresh_connection = true unless active_connection?
        yield connection
      ensure
        release_connection(connection_id) if fresh_connection
      end

      # Returns true if a connection has already been opened.
      def connected?
        synchronize { @connections.any? }
      end

      # Disconnects all connections in the pool, and clears the pool.
      def disconnect!
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect!
          end
          @connections = []
        end
      end

      # Clears the cache which maps classes.
      def clear_reloadable_connections!
        synchronize do
          @reserved_connections = {}
          @connections.each do |conn|
            checkin conn
            conn.disconnect! if conn.requires_reloading?
          end
          @connections.delete_if do |conn|
            conn.requires_reloading?
          end
        end
      end

      # Verify active connections and remove and disconnect connections
      # associated with stale threads.
      def verify_active_connections! #:nodoc:
        synchronize do
          clear_stale_cached_connections!
          @connections.each do |connection|
            connection.verify!
          end
        end
      end

      # Return any checked-out connections back to the pool by threads that
      # are no longer alive.
      def clear_stale_cached_connections!
        keys = @reserved_connections.keys - Thread.list.find_all { |t|
          t.alive?
        }.map { |thread| thread.object_id }
        keys.each do |key|
          conn = @reserved_connections[key]
          ActiveSupport::Deprecation.warn(<<-eowarn) if conn.in_use?
Database connections will not be closed automatically, please close your
database connection at the end of the thread by calling `close` on your
connection.  For example: Keymap::Base.connection.close
          eowarn
          checkin conn
          @reserved_connections.delete(key)
        end
      end

      # Check-out a database connection from the pool, indicating that you want
      # to use it. You should call #checkin when you no longer need this.
      #
      # This is done by either returning an existing connection, or by creating
      # a new connection. If the maximum number of connections for this pool has
      # already been reached, but the pool is empty (i.e. they're all being used),
      # then this method will wait until a thread has checked in a connection.
      # The wait time is bounded however: if no connection can be checked out
      # within the timeout specified for this pool, then a ConnectionTimeoutError
      # exception will be raised.
      #
      # Returns: an AbstractAdapter object.
      #
      # Raises:
      # - ConnectionTimeoutError: no connection can be obtained from the pool
      #   within the timeout period.
      def checkout
        synchronize do
          waited_time = 0

          loop do
            conn = @connections.find { |c| c.lease }

            unless conn
              if @connections.size < @size
                conn = checkout_new_connection
                conn.lease
              end
            end

            if conn
              checkout_and_verify conn
              return conn
            end

            if waited_time >= @timeout
              raise ConnectionTimeoutError, "could not obtain a database connection#{" within #@timeout seconds" if @timeout} (waited #{waited_time} seconds). The max pool size is currently #@size; consider increasing it."
            end

            # Sometimes our wait can end because a connection is available,
            # but another thread can snatch it up first. If timeout hasn't
            # passed but no connection is avail, looks like that happened --
            # loop and wait again, for the time remaining on our timeout. 
            before_wait = Time.now
            @queue.wait([@timeout - waited_time, 0].max)
            waited_time += (Time.now - before_wait)

            # Will go away in Rails 4, when we don't clean up
            # after leaked connections automatically anymore. Right now, clean
            # up after we've returned from a 'wait' if it looks like it's
            # needed, then loop and try again. 
            if active_connections.size >= @connections.size
              clear_stale_cached_connections!
            end
          end
        end
      end

      # Check-in a database connection back into the pool, indicating that you
      # no longer need this connection.
      #
      # +conn+: an AbstractAdapter object, which was obtained by earlier by
      # calling +checkout+ on this pool.
      def checkin(conn)
        synchronize do
          conn.run_callbacks :checkin do
            conn.expire
            @queue.signal
          end

          release conn
        end
      end

      private

      def release(conn)
        synchronize do
          if @reserved_connections[current_connection_id] == conn
            thread_id = current_connection_id
          else
            thread_id = @reserved_connections.keys.find { |k|
              @reserved_connections[k] == conn
            }
          end
          @reserved_connections.delete thread_id if thread_id
        end
      end

      def new_connection
        Keymap::Base.send(spec.adapter_method, spec.config)
      end

      def current_connection_id #:nodoc:
        Keymap::Base.connection_id ||= Thread.current.object_id
      end

      def checkout_new_connection
        raise ConnectionNotEstablished unless @automatic_reconnect

        c = new_connection
        c.pool = self
        @connections << c
        c
      end

      def checkout_and_verify(c)
        c.run_callbacks :checkout do
          c.verify!
        end
        c
      end

      def active_connections
        @connections.find_all { |c| c.in_use? }
      end
    end

    # ConnectionHandler is a collection of ConnectionPool objects. It is used
    # for keeping separate connection pools for Keymap objects stored in
    # different databases.
    #
    # Normally there is only a single ConnectionHandler instance, accessible via
    # Keymap::Base.connection_handler.
    class ConnectionHandler
      attr_reader :connection_pools

      def initialize(pools = {})
        @connection_pools = pools
        @class_to_pool = {}
      end

      def establish_connection(name, spec)
        @connection_pools[spec] ||= ConnectionAdapters::ConnectionPool.new(spec)
        @class_to_pool[name] = @connection_pools[spec]
      end

      # Returns true if there are any active connections among the connection
      # pools that the ConnectionHandler is managing.
      def active_connections?
        connection_pools.values.any? { |pool| pool.active_connection? }
      end

      # Returns any connections in use by the current thread back to the pool.
      def clear_active_connections!
        @connection_pools.each_value { |pool| pool.release_connection }
      end

      # Clears the cache which maps classes.
      def clear_reloadable_connections!
        @connection_pools.each_value { |pool| pool.clear_reloadable_connections! }
      end

      def clear_all_connections!
        @connection_pools.each_value { |pool| pool.disconnect! }
      end

      # Verify active connections.
      def verify_active_connections! #:nodoc:
        @connection_pools.each_value { |pool| pool.verify_active_connections! }
      end

      # Locate the connection of the nearest super class. This can be an
      # active or defined connection: if it is the latter, it will be
      # opened and set as the active connection for the class it was defined
      # for (not necessarily the current class).
      def retrieve_connection(klass) #:nodoc:
        pool = retrieve_connection_pool(klass)
        (pool && pool.connection) or raise ConnectionNotEstablished
      end

      # Returns true if a connection that's accessible to this class has
      # already been opened.
      def connected?(klass)
        conn = retrieve_connection_pool(klass)
        conn && conn.connected?
      end

      # Remove the connection for this class. This will close the active
      # connection and the defined connection (if they exist). The result
      # can be used as an argument for establish_connection, for easily
      # re-establishing the connection.
      def remove_connection(klass)
        pool = @class_to_pool.delete(klass.name)
        return nil unless pool

        @connection_pools.delete pool.spec
        pool.automatic_reconnect = false
        pool.disconnect!
        pool.spec.config
      end

      def retrieve_connection_pool(klass)
        pool = @class_to_pool[klass.name]
        return pool if pool
        return nil if Keymap::Base == klass
        retrieve_connection_pool klass.superclass
      end
    end

    class ConnectionManagement
      class Proxy # :nodoc:
        attr_reader :body, :testing

        def initialize(body, testing = false)
          @body = body
          @testing = testing
        end

        def method_missing(method_sym, *arguments, &block)
          @body.send(method_sym, *arguments, &block)
        end

        def respond_to?(method_sym, include_private = false)
          @body.respond_to?(method_sym)
        end

        def each(&block)
          body.each(&block)
        end

        def close
          body.close if body.respond_to?(:close)

          # Don't return connection (and perform implicit rollback) if
          # this request is a part of integration test
          Keymap::Base.clear_active_connections! unless testing
        end
      end

      def initialize(app)
        @app = app
      end

      def call(env)
        testing = env.key?('rack.test')

        status, headers, body = @app.call(env)

        [status, headers, Proxy.new(body, testing)]
      rescue
        Keymap::Base.clear_active_connections! unless testing
        raise
      end
    end
  end
end