lib/mongo/server/connection_pool.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Server
# Represents a connection pool for server connections.
#
# @since 2.0.0, largely rewritten in 2.9.0
class ConnectionPool
include Loggable
include Monitoring::Publishable
extend Forwardable
# The default max size for the connection pool.
#
# @since 2.9.0
DEFAULT_MAX_SIZE = 20
# The default min size for the connection pool.
#
# @since 2.9.0
DEFAULT_MIN_SIZE = 0
# The default maximum number of connections that can be connecting at
# any given time.
DEFAULT_MAX_CONNECTING = 2
# The default timeout, in seconds, to wait for a connection.
#
# This timeout applies while in flow threads are waiting for background
# threads to establish connections (and hence they must connect, handshake
# and auth in the allotted time).
#
# It is currently set to 10 seconds. The default connect timeout is
# 10 seconds by itself, but setting large timeouts can get applications
# in trouble if their requests get timed out by the reverse proxy,
# thus anything over 15 seconds is potentially dangerous.
#
# @since 2.9.0
DEFAULT_WAIT_TIMEOUT = 10.freeze
# Condition variable broadcast when the size of the pool changes
# to wake up the populator
attr_reader :populate_semaphore
# Create the new connection pool.
#
# @param [ Server ] server The server which this connection pool is for.
# @param [ Hash ] options The connection pool options.
#
# @option options [ Integer ] :max_size The maximum pool size. Setting
# this option to zero creates an unlimited connection pool.
# @option options [ Integer ] :max_connecting The maximum number of
# connections that can be connecting simultaneously. The default is 2.
# This option should be increased if there are many threads that share
# same connection pool and the application is experiencing timeouts
# while waiting for connections to be established.
# @option options [ Integer ] :max_pool_size Deprecated.
# The maximum pool size. If max_size is also given, max_size and
# max_pool_size must be identical.
# @option options [ Integer ] :min_size The minimum pool size.
# @option options [ Integer ] :min_pool_size Deprecated.
# The minimum pool size. If min_size is also given, min_size and
# min_pool_size must be identical.
# @option options [ Float ] :wait_timeout The time to wait, in
# seconds, for a free connection.
# @option options [ Float ] :wait_queue_timeout Deprecated.
# Alias for :wait_timeout. If both wait_timeout and wait_queue_timeout
# are given, their values must be identical.
# @option options [ Float ] :max_idle_time The time, in seconds,
# after which idle connections should be closed by the pool.
# @option options [ true, false ] :populator_io For internal driver
# use only. Set to false to prevent the populator threads from being
# created and started in the server's connection pool. It is intended
# for use in tests that also turn off monitoring_io, unless the populator
# is explicitly needed. If monitoring_io is off, but the populator_io
# is on, the populator needs to be manually closed at the end of the
# test, since a cluster without monitoring is considered not connected,
# and thus will not clean up the connection pool populator threads on
# close.
# Note: Additionally, options for connections created by this pool should
# be included in the options passed here, and they will be forwarded to
# any connections created by the pool.
#
# @since 2.0.0, API changed in 2.9.0
def initialize(server, options = {})
unless server.is_a?(Server)
raise ArgumentError, 'First argument must be a Server instance'
end
options = options.dup
if options[:min_size] && options[:min_pool_size] && options[:min_size] != options[:min_pool_size]
raise ArgumentError, "Min size #{options[:min_size]} is not identical to min pool size #{options[:min_pool_size]}"
end
if options[:max_size] && options[:max_pool_size] && options[:max_size] != options[:max_pool_size]
raise ArgumentError, "Max size #{options[:max_size]} is not identical to max pool size #{options[:max_pool_size]}"
end
if options[:wait_timeout] && options[:wait_queue_timeout] && options[:wait_timeout] != options[:wait_queue_timeout]
raise ArgumentError, "Wait timeout #{options[:wait_timeout]} is not identical to wait queue timeout #{options[:wait_queue_timeout]}"
end
options[:min_size] ||= options[:min_pool_size]
options.delete(:min_pool_size)
options[:max_size] ||= options[:max_pool_size]
options.delete(:max_pool_size)
if options[:min_size] && options[:max_size] &&
(options[:max_size] != 0 && options[:min_size] > options[:max_size])
then
raise ArgumentError, "Cannot have min size #{options[:min_size]} exceed max size #{options[:max_size]}"
end
if options[:wait_queue_timeout]
options[:wait_timeout] ||= options[:wait_queue_timeout]
end
options.delete(:wait_queue_timeout)
@server = server
@options = options.freeze
@generation_manager = GenerationManager.new(server: server)
@ready = false
@closed = false
# A connection owned by this pool should be either in the
# available connections array (which is used as a stack)
# or in the checked out connections set.
@available_connections = available_connections = []
@checked_out_connections = Set.new
@pending_connections = Set.new
@interrupt_connections = []
# Mutex used for synchronizing access to @available_connections and
# @checked_out_connections. The pool object is thread-safe, thus
# all methods that retrieve or modify instance variables generally
# must do so under this lock.
@lock = Mutex.new
# Background thread reponsible for maintaining the size of
# the pool to at least min_size
@populator = Populator.new(self, options)
@populate_semaphore = Semaphore.new
# Condition variable to enforce the first check in check_out: max_pool_size.
# This condition variable should be signaled when the number of
# unavailable connections decreases (pending + pending_connections +
# checked_out_connections).
@size_cv = Mongo::ConditionVariable.new(@lock)
# This represents the number of threads that have made it past the size_cv
# gate but have not acquired a connection to add to the pending_connections
# set.
@connection_requests = 0
# Condition variable to enforce the second check in check_out: max_connecting.
# Thei condition variable should be signaled when the number of pending
# connections decreases.
@max_connecting_cv = Mongo::ConditionVariable.new(@lock)
@max_connecting = options.fetch(:max_connecting, DEFAULT_MAX_CONNECTING)
ObjectSpace.define_finalizer(self, self.class.finalize(@available_connections, @pending_connections, @populator))
publish_cmap_event(
Monitoring::Event::Cmap::PoolCreated.new(@server.address, options, self)
)
end
# @return [ Hash ] options The pool options.
attr_reader :options
# @api private
attr_reader :server
# @api private
def_delegators :server, :address
# Get the maximum size of the connection pool.
#
# @return [ Integer ] The maximum size of the connection pool.
#
# @since 2.9.0
def max_size
@max_size ||= options[:max_size] || [DEFAULT_MAX_SIZE, min_size].max
end
# Get the minimum size of the connection pool.
#
# @return [ Integer ] The minimum size of the connection pool.
#
# @since 2.9.0
def min_size
@min_size ||= options[:min_size] || DEFAULT_MIN_SIZE
end
# The time to wait, in seconds, for a connection to become available.
#
# @return [ Float ] The queue wait timeout.
#
# @since 2.9.0
def wait_timeout
@wait_timeout ||= options[:wait_timeout] || DEFAULT_WAIT_TIMEOUT
end
# The maximum seconds a socket can remain idle since it has been
# checked in to the pool, if set.
#
# @return [ Float | nil ] The max socket idle time in seconds.
#
# @since 2.9.0
def max_idle_time
@max_idle_time ||= options[:max_idle_time]
end
# @api private
attr_reader :generation_manager
# @return [ Integer ] generation Generation of connections currently
# being used by the queue.
#
# @api private
def_delegators :generation_manager, :generation, :generation_unlocked
# A connection pool is paused if it is not closed and it is not ready.
#
# @return [ true | false ] whether the connection pool is paused.
#
# @raise [ Error::PoolClosedError ] If the pool has been closed.
def paused?
raise_if_closed!
@lock.synchronize do
!@ready
end
end
# Size of the connection pool.
#
# Includes available and checked out connections.
#
# @return [ Integer ] Size of the connection pool.
#
# @since 2.9.0
def size
raise_if_closed!
@lock.synchronize do
unsynchronized_size
end
end
# Returns the size of the connection pool without acquiring the lock.
# This method should only be used by other pool methods when they are
# already holding the lock as Ruby does not allow a thread holding a
# lock to acquire this lock again.
def unsynchronized_size
@available_connections.length + @checked_out_connections.length + @pending_connections.length
end
private :unsynchronized_size
# @return [ Integer ] The number of unavailable connections in the pool.
# Used to calculate whether we have hit max_pool_size.
#
# @api private
def unavailable_connections
@checked_out_connections.length + @pending_connections.length + @connection_requests
end
# Number of available connections in the pool.
#
# @return [ Integer ] Number of available connections.
#
# @since 2.9.0
def available_count
raise_if_closed!
@lock.synchronize do
@available_connections.length
end
end
# Whether the pool has been closed.
#
# @return [ true | false ] Whether the pool is closed.
#
# @since 2.9.0
def closed?
!!@closed
end
# Whether the pool is ready.
#
# @return [ true | false ] Whether the pool is ready.
def ready?
@lock.synchronize do
@ready
end
end
# @note This method is experimental and subject to change.
#
# @api experimental
# @since 2.11.0
def summary
@lock.synchronize do
state = if closed?
'closed'
elsif !@ready
'paused'
else
'ready'
end
"#<ConnectionPool size=#{unsynchronized_size} (#{min_size}-#{max_size}) " +
"used=#{@checked_out_connections.length} avail=#{@available_connections.length} pending=#{@pending_connections.length} #{state}>"
end
end
# @since 2.9.0
def_delegators :@server, :monitoring
# @api private
attr_reader :populator
# @api private
attr_reader :max_connecting
# Checks a connection out of the pool.
#
# If there are active connections in the pool, the most recently used
# connection is returned. Otherwise if the connection pool size is less
# than the max size, creates a new connection and returns it. Otherwise
# waits up to the wait timeout and raises Timeout::Error if there are
# still no active connections and the pool is at max size.
#
# The returned connection counts toward the pool's max size. When the
# caller is finished using the connection, the connection should be
# checked back in via the check_in method.
#
# @return [ Mongo::Server::Connection ] The checked out connection.
# @raise [ Error::PoolClosedError ] If the pool has been closed.
# @raise [ Timeout::Error ] If the connection pool is at maximum size
# and remains so for longer than the wait timeout.
#
# @since 2.9.0
def check_out(connection_global_id: nil)
check_invariants
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckOutStarted.new(@server.address)
)
raise_if_pool_closed!
raise_if_pool_paused_locked!
connection = retrieve_and_connect_connection(connection_global_id)
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckedOut.new(@server.address, connection.id, self),
)
if Lint.enabled?
unless connection.connected?
raise Error::LintError, "Connection pool for #{address} checked out a disconnected connection #{connection.generation}:#{connection.id}"
end
end
connection
ensure
check_invariants
end
# Check a connection back into the pool.
#
# The connection must have been previously created by this pool.
#
# @param [ Mongo::Server::Connection ] connection The connection.
#
# @since 2.9.0
def check_in(connection)
check_invariants
@lock.synchronize do
do_check_in(connection)
end
ensure
check_invariants
end
# Executes the check in after having already acquired the lock.
#
# @param [ Mongo::Server::Connection ] connection The connection.
def do_check_in(connection)
# When a connection is interrupted it is checked back into the pool
# and closed. The operation that was using the connection before it was
# interrupted will attempt to check it back into the pool, and we
# should ignore it since its already been closed and removed from the pool.
return if connection.closed? && connection.interrupted?
unless connection.connection_pool == self
raise ArgumentError, "Trying to check in a connection which was not checked out by this pool: #{connection} checked out from pool #{connection.connection_pool} (for #{self})"
end
unless @checked_out_connections.include?(connection)
raise ArgumentError, "Trying to check in a connection which is not currently checked out by this pool: #{connection} (for #{self})"
end
# Note: if an event handler raises, resource will not be signaled.
# This means threads waiting for a connection to free up when
# the pool is at max size may time out.
# Threads that begin waiting after this method completes (with
# the exception) should be fine.
@checked_out_connections.delete(connection)
@size_cv.signal
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckedIn.new(@server.address, connection.id, self)
)
if connection.interrupted?
connection.disconnect!(reason: :stale)
return
end
if connection.error?
connection.disconnect!(reason: :error)
return
end
if closed?
connection.disconnect!(reason: :pool_closed)
return
end
if connection.closed?
# Connection was closed - for example, because it experienced
# a network error. Nothing else needs to be done here.
@populate_semaphore.signal
elsif connection.generation != generation(service_id: connection.service_id) && !connection.pinned?
# If connection is marked as pinned, it is used by a transaction
# or a series of cursor operations in a load balanced setup.
# In this case connection should not be disconnected until
# unpinned.
connection.disconnect!(reason: :stale)
@populate_semaphore.signal
else
connection.record_checkin!
@available_connections << connection
@max_connecting_cv.signal
end
end
# Mark the connection pool as paused.
def pause
raise_if_closed!
check_invariants
@lock.synchronize do
do_pause
end
ensure
check_invariants
end
# Mark the connection pool as paused without acquiring the lock.
#
# @api private
def do_pause
if Lint.enabled? && !@server.unknown?
raise Error::LintError, "Attempting to pause pool for server #{@server.summary} which is known"
end
return if !@ready
@ready = false
end
# Closes all idle connections in the pool and schedules currently checked
# out connections to be closed when they are checked back into the pool.
# The pool is paused, it will not create new connections in background
# and it will fail checkout requests until marked ready.
#
# @option options [ true | false ] :lazy If true, do not close any of
# the idle connections and instead let them be closed during a
# subsequent check out operation. Defaults to false.
# @option options [ true | false ] :interrupt_in_use_connections If true,
# close all checked out connections immediately. If it is false, do not
# close any of the checked out connections. Defaults to true.
# @option options [ Object ] :service_id Clear connections with
# the specified service id only.
#
# @return [ true ] true.
#
# @since 2.1.0
def clear(options = nil)
raise_if_closed!
if Lint.enabled? && !@server.unknown?
raise Error::LintError, "Attempting to clear pool for server #{@server.summary} which is known"
end
do_clear(options)
end
# Disconnects the pool.
#
# Does everything that +clear+ does, except if the pool is closed
# this method does nothing but +clear+ would raise PoolClosedError.
#
# @since 2.1.0
# @api private
def disconnect!(options = nil)
do_clear(options)
rescue Error::PoolClosedError
# The "disconnected" state is between closed and paused.
# When we are trying to disconnect the pool, permit the pool to be
# already closed.
end
def do_clear(options = nil)
check_invariants
service_id = options && options[:service_id]
@lock.synchronize do
# Generation must be bumped before emitting pool cleared event.
@generation_manager.bump(service_id: service_id)
unless options && options[:lazy]
close_available_connections(service_id)
end
if options && options[:interrupt_in_use_connections]
schedule_for_interruption(@checked_out_connections, service_id)
schedule_for_interruption(@pending_connections, service_id)
end
if @ready
publish_cmap_event(
Monitoring::Event::Cmap::PoolCleared.new(
@server.address,
service_id: service_id,
interrupt_in_use_connections: options&.[](:interrupt_in_use_connections)
)
)
# Only pause the connection pool if the server was marked unknown,
# otherwise, allow the retry to be attempted with a ready pool.
do_pause if !@server.load_balancer? && @server.unknown?
end
# Broadcast here to cause all of the threads waiting on the max
# connecting to break out of the wait loop and error.
@max_connecting_cv.broadcast
# Broadcast here to cause all of the threads waiting on the pool size
# to break out of the wait loop and error.
@size_cv.broadcast
end
# "Schedule the background thread" after clearing. This is responsible
# for cleaning up stale threads, and interrupting in use connections.
@populate_semaphore.signal
true
ensure
check_invariants
end
# Instructs the pool to create and return connections.
def ready
raise_if_closed!
# TODO: Add this back in RUBY-3174.
# if Lint.enabled?
# unless @server.connected?
# raise Error::LintError, "Attempting to ready a pool for server #{@server.summary} which is disconnected"
# end
# end
@lock.synchronize do
return if @ready
@ready = true
end
# Note that the CMAP spec demands serialization of CMAP events for a
# pool. In order to implement this, event publication must be done into
# a queue which is synchronized, instead of subscribers being invoked
# from the trigger method like this one here inline. On MRI, assuming
# the threads yield to others when they stop having work to do, it is
# likely that the events would in practice always be published in the
# required order. JRuby, being truly concurrent with OS threads,
# would not offers such a guarantee.
publish_cmap_event(
Monitoring::Event::Cmap::PoolReady.new(@server.address, options, self)
)
if options.fetch(:populator_io, true)
if @populator.running?
@populate_semaphore.signal
else
@populator.run!
end
end
end
# Marks the pool closed, closes all idle connections in the pool and
# schedules currently checked out connections to be closed when they are
# checked back into the pool. If force option is true, checked out
# connections are also closed. Attempts to use the pool after it is closed
# will raise Error::PoolClosedError.
#
# @option options [ true | false ] :force Also close all checked out
# connections.
# @option options [ true | false ] :stay_ready For internal driver use
# only. Whether or not to mark the pool as closed.
#
# @return [ true ] Always true.
#
# @since 2.9.0
def close(options = nil)
return if closed?
options ||= {}
stop_populator
@lock.synchronize do
until @available_connections.empty?
connection = @available_connections.pop
connection.disconnect!(reason: :pool_closed)
end
if options[:force]
until @checked_out_connections.empty?
connection = @checked_out_connections.take(1).first
connection.disconnect!(reason: :pool_closed)
@checked_out_connections.delete(connection)
end
end
unless options && options[:stay_ready]
# mark pool as closed before releasing lock so
# no connections can be created, checked in, or checked out
@closed = true
@ready = false
end
@max_connecting_cv.broadcast
@size_cv.broadcast
end
publish_cmap_event(
Monitoring::Event::Cmap::PoolClosed.new(@server.address, self)
)
true
end
# Get a pretty printed string inspection for the pool.
#
# @example Inspect the pool.
# pool.inspect
#
# @return [ String ] The pool inspection.
#
# @since 2.0.0
def inspect
if closed?
"#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
"wait_timeout=#{wait_timeout} closed>"
elsif !ready?
"#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
"wait_timeout=#{wait_timeout} paused>"
else
"#<Mongo::Server::ConnectionPool:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " +
"wait_timeout=#{wait_timeout} current_size=#{size} available=#{available_count}>"
end
end
# Yield the block to a connection, while handling check in/check out logic.
#
# @example Execute with a connection.
# pool.with_connection do |connection|
# connection.read
# end
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_connection(connection_global_id: nil)
raise_if_closed!
connection = check_out(connection_global_id: connection_global_id)
yield(connection)
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e
maybe_raise_pool_cleared!(connection, e)
ensure
if connection
check_in(connection)
end
end
# Close sockets that have been open for longer than the max idle time,
# if the option is set.
#
# @since 2.5.0
def close_idle_sockets
return if closed?
return unless max_idle_time
@lock.synchronize do
i = 0
while i < @available_connections.length
connection = @available_connections[i]
if last_checkin = connection.last_checkin
if (Time.now - last_checkin) > max_idle_time
connection.disconnect!(reason: :idle)
@available_connections.delete_at(i)
@populate_semaphore.signal
next
end
end
i += 1
end
end
end
# Stop the background populator thread and clean up any connections created
# which have not been connected yet.
#
# Used when closing the pool or when terminating the bg thread for testing
# purposes. In the latter case, this method must be called before the pool
# is used, to ensure no connections in pending_connections were created in-flow
# by the check_out method.
#
# @api private
def stop_populator
@populator.stop!
@lock.synchronize do
# If stop_populator is called while populate is running, there may be
# connections waiting to be connected, connections which have not yet
# been moved to available_connections, or connections moved to available_connections
# but not deleted from pending_connections. These should be cleaned up.
clear_pending_connections
end
end
# This method does three things:
# 1. Creates and adds a connection to the pool, if the pool's size is
# below min_size. Retries once if a socket-related error is
# encountered during this process and raises if a second error or a
# non socket-related error occurs.
# 2. Removes stale connections from the connection pool.
# 3. Interrupts connections marked for interruption.
#
# Used by the pool populator background thread.
#
# @return [ true | false ] Whether this method should be called again
# to create more connections.
# @raise [ Error::AuthError, Error ] The second socket-related error raised if a retry
# occured, or the non socket-related error
#
# @api private
def populate
return false if closed?
begin
return create_and_add_connection
rescue Error::SocketError, Error::SocketTimeoutError => e
# an error was encountered while connecting the connection,
# ignore this first error and try again.
log_warn("Populator failed to connect a connection for #{address}: #{e.class}: #{e}. It will retry.")
end
return create_and_add_connection
end
# Finalize the connection pool for garbage collection.
#
# @param [ List<Mongo::Connection> ] available_connections The available connections.
# @param [ List<Mongo::Connection> ] pending_connections The pending connections.
# @param [ Populator ] populator The populator.
#
# @return [ Proc ] The Finalizer.
def self.finalize(available_connections, pending_connections, populator)
proc do
available_connections.each do |connection|
connection.disconnect!(reason: :pool_closed)
end
available_connections.clear
pending_connections.each do |connection|
connection.disconnect!(reason: :pool_closed)
end
pending_connections.clear
# Finalizer does not close checked out connections.
# Those would have to be garbage collected on their own
# and that should close them.
end
end
private
# Returns the next available connection, optionally with given
# global id. If no suitable connections are available,
# returns nil.
def next_available_connection(connection_global_id)
raise_unless_locked!
if @server.load_balancer? && connection_global_id
conn = @available_connections.detect do |conn|
conn.global_id == connection_global_id
end
if conn
@available_connections.delete(conn)
end
conn
else
@available_connections.pop
end
end
def create_connection
r, _ = @generation_manager.pipe_fds(service_id: server.description.service_id)
opts = options.merge(
connection_pool: self,
pipe: r
# Do not pass app metadata - this will be retrieved by the connection
# based on the auth needs.
)
unless @server.load_balancer?
opts[:generation] = generation
end
Connection.new(@server, opts)
end
# Create a connection, connect it, and add it to the pool. Also
# check for stale and interruptable connections and deal with them.
#
# @return [ true | false ] True if a connection was created and
# added to the pool, false otherwise
# @raise [ Mongo::Error ] An error encountered during connection connect
def create_and_add_connection
connection = nil
@lock.synchronize do
if !closed? && @ready &&
(unsynchronized_size + @connection_requests) < min_size &&
@pending_connections.length < @max_connecting
then
connection = create_connection
@pending_connections << connection
else
return true if remove_interrupted_connections
return true if remove_stale_connection
return false
end
end
begin
connect_connection(connection)
rescue Exception
@lock.synchronize do
@pending_connections.delete(connection)
@max_connecting_cv.signal
@size_cv.signal
end
raise
end
@lock.synchronize do
@available_connections << connection
@pending_connections.delete(connection)
@max_connecting_cv.signal
@size_cv.signal
end
true
end
# Removes and disconnects all stale available connections.
def remove_stale_connection
if conn = @available_connections.detect(&method(:connection_stale_unlocked?))
conn.disconnect!(reason: :stale)
@available_connections.delete(conn)
return true
end
end
# Interrupt connections scheduled for interruption.
def remove_interrupted_connections
return false if @interrupt_connections.empty?
gens = Set.new
while conn = @interrupt_connections.pop
if @checked_out_connections.include?(conn)
# If the connection has been checked out, mark it as interrupted and it will
# be disconnected on check in.
conn.interrupted!
do_check_in(conn)
elsif @pending_connections.include?(conn)
# If the connection is pending, disconnect with the interrupted flag.
conn.disconnect!(reason: :stale, interrupted: true)
@pending_connections.delete(conn)
end
gens << [ conn.generation, conn.service_id ]
end
# Close the write side of the pipe. Pending connections might be
# hanging on the Kernel#select call, so in order to interrupt that,
# we also listen for the read side of the pipe in Kernel#select and
# close the write side of the pipe here, which will cause select to
# wake up and raise an IOError now that the socket is closed.
# The read side of the pipe will be scheduled for closing on the next
# generation bump.
gens.each do |gen, service_id|
@generation_manager.remove_pipe_fds(gen, service_id: service_id)
end
true
end
# Checks whether a connection is stale.
#
# @param [ Mongo::Server::Connection ] connection The connection to check.
#
# @return [ true | false ] Whether the connection is stale.
def connection_stale_unlocked?(connection)
connection.generation != generation_unlocked(service_id: connection.service_id) &&
!connection.pinned?
end
# Asserts that the pool has not been closed.
#
# @raise [ Error::PoolClosedError ] If the pool has been closed.
#
# @since 2.9.0
def raise_if_closed!
if closed?
raise Error::PoolClosedError.new(@server.address, self)
end
end
# If the connection was interrupted, raise a pool cleared error. If it
# wasn't interrupted raise the original error.
#
# @param [ Connection ] The connection.
# @param [ Mongo::Error ] The original error.
#
# @raise [ Mongo::Error | Mongo::Error::PoolClearedError ] A PoolClearedError
# if the connection was interrupted, the original error if not.
def maybe_raise_pool_cleared!(connection, e)
if connection&.interrupted?
err = Error::PoolClearedError.new(connection.server.address, connection.server.pool_internal).tap do |err|
e.labels.each { |l| err.add_label(l) }
end
raise err
else
raise e
end
end
# Attempts to connect (handshake and auth) the connection. If an error is
# encountered, closes the connection and raises the error.
def connect_connection(connection)
begin
connection.connect!
rescue Exception
connection.disconnect!(reason: :error)
raise
end
rescue Error::SocketError, Error::SocketTimeoutError => exc
@server.unknown!(
generation: exc.generation,
service_id: exc.service_id,
stop_push_monitor: true,
)
raise
end
def check_invariants
return unless Lint.enabled?
# Server summary calls pool summary which requires pool lock -> deadlock.
# Obtain the server summary ahead of time.
server_summary = @server.summary
@lock.synchronize do
@available_connections.each do |connection|
if connection.closed?
raise Error::LintError, "Available connection is closed: #{connection} for #{server_summary}"
end
end
@pending_connections.each do |connection|
if connection.closed?
raise Error::LintError, "Pending connection is closed: #{connection} for #{server_summary}"
end
end
end
end
# Close the available connections.
#
# @param [ Array<Connection> ] connections A list of connections.
# @param [ Object ] service_id The service id.
def close_available_connections(service_id)
if @server.load_balancer? && service_id
loop do
conn = @available_connections.detect do |conn|
conn.service_id == service_id &&
conn.generation < @generation_manager.generation(service_id: service_id)
end
if conn
@available_connections.delete(conn)
conn.disconnect!(reason: :stale, interrupted: true)
@populate_semaphore.signal
else
break
end
end
else
@available_connections.delete_if do |conn|
if conn.generation < @generation_manager.generation(service_id: service_id)
conn.disconnect!(reason: :stale, interrupted: true)
@populate_semaphore.signal
true
end
end
end
end
# Schedule connections of previous generations for interruption.
#
# @param [ Array<Connection> ] connections A list of connections.
# @param [ Object ] service_id The service id.
def schedule_for_interruption(connections, service_id)
@interrupt_connections += connections.select do |conn|
(!server.load_balancer? || conn.service_id == service_id) &&
conn.generation < @generation_manager.generation(service_id: service_id)
end
end
# Clear and disconnect the pending connections.
def clear_pending_connections
until @pending_connections.empty?
connection = @pending_connections.take(1).first
connection.disconnect!
@pending_connections.delete(connection)
end
end
# The lock should be acquired when calling this method.
def raise_check_out_timeout!(connection_global_id)
raise_unless_locked!
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
@server.address,
Monitoring::Event::Cmap::ConnectionCheckOutFailed::TIMEOUT,
),
)
connection_global_id_msg = if connection_global_id
" for connection #{connection_global_id}"
else
''
end
msg = "Timed out attempting to check out a connection " +
"from pool for #{@server.address}#{connection_global_id_msg} after #{wait_timeout} sec. " +
"Connections in pool: #{@available_connections.length} available, " +
"#{@checked_out_connections.length} checked out, " +
"#{@pending_connections.length} pending, " +
"#{@connection_requests} connections requests " +
"(max size: #{max_size})"
raise Error::ConnectionCheckOutTimeout.new(msg, address: @server.address)
end
def raise_check_out_timeout_locked!(connection_global_id)
@lock.synchronize do
raise_check_out_timeout!(connection_global_id)
end
end
def raise_if_pool_closed!
if closed?
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
@server.address,
Monitoring::Event::Cmap::ConnectionCheckOutFailed::POOL_CLOSED
),
)
raise Error::PoolClosedError.new(@server.address, self)
end
end
def raise_if_pool_paused!
raise_unless_locked!
if !@ready
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
@server.address,
# CMAP spec decided to conflate pool paused with all the other
# possible non-timeout errors.
Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR,
),
)
raise Error::PoolPausedError.new(@server.address, self)
end
end
def raise_if_pool_paused_locked!
@lock.synchronize do
raise_if_pool_paused!
end
end
# The lock should be acquired when calling this method.
def raise_if_not_ready!
raise_unless_locked!
raise_if_pool_closed!
raise_if_pool_paused!
end
def raise_unless_locked!
unless @lock.owned?
raise ArgumentError, "the lock must be owned when calling this method"
end
end
def valid_available_connection?(connection, pid, connection_global_id)
if connection.pid != pid
log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{connection.pid}, new pid #{pid}")
connection.disconnect!(reason: :stale)
@populate_semaphore.signal
return false
end
if !connection.pinned?
# If connection is marked as pinned, it is used by a transaction
# or a series of cursor operations in a load balanced setup.
# In this case connection should not be disconnected until
# unpinned.
if connection.generation != generation(
service_id: connection.service_id
)
# Stale connections should be disconnected in the clear
# method, but if any don't, check again here
connection.disconnect!(reason: :stale)
@populate_semaphore.signal
return false
end
if max_idle_time && connection.last_checkin &&
Time.now - connection.last_checkin > max_idle_time
then
connection.disconnect!(reason: :idle)
@populate_semaphore.signal
return false
end
end
true
end
# Retrieves a connection if one is available, otherwise we create a new
# one. If no connection exists and the pool is at max size, wait until
# a connection is checked back into the pool.
#
# @param [ Integer ] pid The current process id.
# @param [ Integer ] connection_global_id The global id for the
# connection to check out.
#
# @return [ Mongo::Server::Connection ] The checked out connection.
#
# @raise [ Error::PoolClosedError ] If the pool has been closed.
# @raise [ Timeout::Error ] If the connection pool is at maximum size
# and remains so for longer than the wait timeout.
def get_connection(pid, connection_global_id)
if connection = next_available_connection(connection_global_id)
unless valid_available_connection?(connection, pid, connection_global_id)
return nil
end
# We've got a connection, so we decrement the number of connection
# requests.
# We do not need to signal condition variable here, because
# because the execution will continue, and we signal later.
@connection_requests -= 1
# If the connection is connected, it's not considered a
# "pending connection". The pending_connections list represents
# the set of connections that are awaiting connection.
unless connection.connected?
@pending_connections << connection
end
return connection
elsif connection_global_id && @server.load_balancer?
# A particular connection is requested, but it is not available.
# If it is nether available not checked out, we should stop here.
@checked_out_connections.detect do |conn|
conn.global_id == connection_global_id
end.tap do |conn|
if conn.nil?
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
@server.address,
Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR
),
)
# We're going to raise, so we need to decrement the number of
# connection requests.
decrement_connection_requests_and_signal
raise Error::MissingConnection.new
end
end
# We need a particular connection, and if it is not available
# we can wait for an in-progress operation to return
# such a connection to the pool.
nil
else
connection = create_connection
@connection_requests -= 1
@pending_connections << connection
return connection
end
end
# Retrieves a connection and connects it.
#
# @param [ Integer ] connection_global_id The global id for the
# connection to check out.
#
# @return [ Mongo::Server::Connection ] The checked out connection.
#
# @raise [ Error::PoolClosedError ] If the pool has been closed.
# @raise [ Timeout::Error ] If the connection pool is at maximum size
# and remains so for longer than the wait timeout.
def retrieve_and_connect_connection(connection_global_id)
deadline = Utils.monotonic_time + wait_timeout
connection = nil
@lock.synchronize do
# The first gate to checking out a connection. Make sure the number of
# unavailable connections is less than the max pool size.
until max_size == 0 || unavailable_connections < max_size
wait = deadline - Utils.monotonic_time
raise_check_out_timeout!(connection_global_id) if wait <= 0
@size_cv.wait(wait)
raise_if_not_ready!
end
@connection_requests += 1
connection = wait_for_connection(connection_global_id, deadline)
end
connect_or_raise(connection) unless connection.connected?
@lock.synchronize do
@checked_out_connections << connection
if @pending_connections.include?(connection)
@pending_connections.delete(connection)
end
@max_connecting_cv.signal
# no need to signal size_cv here since the number of unavailable
# connections is unchanged.
end
connection
end
# Waits for a connection to become available, or raises is no connection
# becomes available before the timeout.
# @param [ Integer ] connection_global_id The global id for the
# connection to check out.
# @param [ Float ] deadline The time at which to stop waiting.
#
# @return [ Mongo::Server::Connection ] The checked out connection.
def wait_for_connection(connection_global_id, deadline)
connection = nil
while connection.nil?
# The second gate to checking out a connection. Make sure 1) there
# exists an available connection and 2) we are under max_connecting.
until @available_connections.any? || @pending_connections.length < @max_connecting
wait = deadline - Utils.monotonic_time
if wait <= 0
# We are going to raise a timeout error, so the connection
# request is not going to be fulfilled. Decrement the counter
# here.
decrement_connection_requests_and_signal
raise_check_out_timeout!(connection_global_id)
end
@max_connecting_cv.wait(wait)
# We do not need to decrement the connection_requests counter
# or signal here because the pool is not ready yet.
raise_if_not_ready!
end
connection = get_connection(Process.pid, connection_global_id)
wait = deadline - Utils.monotonic_time
if connection.nil? && wait <= 0
# connection is nil here, it means that get_connection method
# did not create a new connection; therefore, it did not decrease
# the connection_requests counter. We need to do it here.
decrement_connection_requests_and_signal
raise_check_out_timeout!(connection_global_id)
end
end
connection
end
# Connects a connection and raises an exception if the connection
# cannot be connected.
# This method also publish corresponding event and ensures that counters
# and condition variables are updated.
def connect_or_raise(connection)
connect_connection(connection)
rescue Exception
# Handshake or authentication failed
@lock.synchronize do
if @pending_connections.include?(connection)
@pending_connections.delete(connection)
end
@max_connecting_cv.signal
@size_cv.signal
end
@populate_semaphore.signal
publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCheckOutFailed.new(
@server.address,
Monitoring::Event::Cmap::ConnectionCheckOutFailed::CONNECTION_ERROR
),
)
raise
end
# Decrement connection requests counter and signal the condition
# variables that the number of unavailable connections has decreased.
def decrement_connection_requests_and_signal
@connection_requests -= 1
@max_connecting_cv.signal
@size_cv.signal
end
end
end
end
require 'mongo/server/connection_pool/generation_manager'
require 'mongo/server/connection_pool/populator'