lib/mongo/session/session_pool.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2017-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Session
# A pool of server sessions.
#
# @api private
#
# @since 2.5.0
class SessionPool
# Initialize a SessionPool.
#
# @example
# SessionPool.new(cluster)
#
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
# session pool.
#
# @since 2.5.0
def initialize(cluster)
@queue = []
@mutex = Mutex.new
@cluster = cluster
end
# Get a formatted string for use in inspection.
#
# @example Inspect the session pool object.
# session_pool.inspect
#
# @return [ String ] The session pool inspection.
#
# @since 2.5.0
def inspect
"#<Mongo::Session::SessionPool:0x#{object_id} current_size=#{@queue.size}>"
end
# Check out a server session from the pool.
#
# @example Check out a session.
# pool.checkout
#
# @return [ ServerSession ] The server session.
#
# @since 2.5.0
def checkout
@mutex.synchronize do
loop do
if @queue.empty?
return ServerSession.new
else
session = @queue.shift
unless about_to_expire?(session)
return session
end
end
end
end
end
# Checkin a server session to the pool.
#
# @example Checkin a session.
# pool.checkin(session)
#
# @param [ Session::ServerSession ] session The session to checkin.
#
# @since 2.5.0
def checkin(session)
if session.nil?
raise ArgumentError, 'session cannot be nil'
end
@mutex.synchronize do
prune!
@queue.unshift(session) if return_to_queue?(session)
end
end
# End all sessions in the pool by sending the endSessions command to the server.
#
# @example End all sessions.
# pool.end_sessions
#
# @since 2.5.0
def end_sessions
while !@queue.empty?
server = ServerSelector.get(mode: :primary_preferred).select_server(@cluster)
op = Operation::Command.new(
selector: {
endSessions: @queue.shift(10_000).map(&:session_id),
},
db_name: Database::ADMIN,
)
context = Operation::Context.new(options: {
server_api: server.options[:server_api],
})
op.execute(server, context: context)
end
rescue Mongo::Error, Error::AuthError
end
private
# Query whether the given session is okay to return to the
# pool's queue.
#
# @param [ Session::ServerSession ] session the session to query
#
# @return [ true | false ] whether to return the session to the
# queue.
def return_to_queue?(session)
!session.dirty? && !about_to_expire?(session)
end
def about_to_expire?(session)
if session.nil?
raise ArgumentError, 'session cannot be nil'
end
# Load balancers spec explicitly requires to ignore the logical session
# timeout value.
# No rationale is provided as of the time of this writing.
if @cluster.load_balanced?
return false
end
logical_session_timeout = @cluster.logical_session_timeout
if logical_session_timeout
idle_time_minutes = (Time.now - session.last_use) / 60
(idle_time_minutes + 1) >= logical_session_timeout
end
end
def prune!
# Load balancers spec explicitly requires not to prune sessions.
# No rationale is provided as of the time of this writing.
return if @cluster.load_balanced?
while !@queue.empty?
if about_to_expire?(@queue[-1])
@queue.pop
else
break
end
end
end
end
end
end