lib/mongo/server_selector/base.rb
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
module ServerSelector
class Base
# Initialize the server selector.
#
# @example Initialize the selector.
# Mongo::ServerSelector::Secondary.new(:tag_sets => [{'dc' => 'nyc'}])
#
# @example Initialize the preference with no options.
# Mongo::ServerSelector::Secondary.new
#
# @param [ Hash ] options The server preference options.
#
# @option options [ Integer ] :local_threshold The local threshold boundary for
# nearest selection in seconds.
# @option options [ Integer ] max_staleness The maximum replication lag,
# in seconds, that a secondary can suffer and still be eligible for a read.
# A value of -1 is treated identically to nil, which is to not
# have a maximum staleness.
# @option options [ Hash | nil ] hedge A Hash specifying whether to enable hedged
# reads on the server. Hedged reads are not enabled by default. When
# specifying this option, it must be in the format: { enabled: true },
# where the value of the :enabled key is a boolean value.
#
# @raise [ Error::InvalidServerPreference ] If tag sets are specified
# but not allowed.
#
# @api private
def initialize(options = nil)
options = options ? options.dup : {}
if options[:max_staleness] == -1
options.delete(:max_staleness)
end
@options = options
@tag_sets = options[:tag_sets] || []
@max_staleness = options[:max_staleness]
@hedge = options[:hedge]
validate!
end
# @return [ Hash ] options The options.
attr_reader :options
# @return [ Array ] tag_sets The tag sets used to select servers.
attr_reader :tag_sets
# @return [ Integer ] max_staleness The maximum replication lag, in
# seconds, that a secondary can suffer and still be eligible for a read.
#
# @since 2.4.0
attr_reader :max_staleness
# @return [ Hash | nil ] hedge The document specifying whether to enable
# hedged reads.
attr_reader :hedge
# Get the timeout for server selection.
#
# @example Get the server selection timeout, in seconds.
# selector.server_selection_timeout
#
# @return [ Float ] The timeout.
#
# @since 2.0.0
#
# @deprecated This setting is now taken from the cluster options when
# a server is selected. Will be removed in version 3.0.
def server_selection_timeout
@server_selection_timeout ||=
(options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT)
end
# Get the local threshold boundary for nearest selection in seconds.
#
# @example Get the local threshold.
# selector.local_threshold
#
# @return [ Float ] The local threshold.
#
# @since 2.0.0
#
# @deprecated This setting is now taken from the cluster options when
# a server is selected. Will be removed in version 3.0.
def local_threshold
@local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD)
end
# @api private
def local_threshold_with_cluster(cluster)
options[:local_threshold] || cluster.options[:local_threshold] || LOCAL_THRESHOLD
end
# Inspect the server selector.
#
# @example Inspect the server selector.
# selector.inspect
#
# @return [ String ] The inspection.
#
# @since 2.2.0
def inspect
"#<#{self.class.name}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect} hedge=#{hedge}>"
end
# Check equality of two server selectors.
#
# @example Check server selector equality.
# preference == other
#
# @param [ Object ] other The other preference.
#
# @return [ true, false ] Whether the objects are equal.
#
# @since 2.0.0
def ==(other)
name == other.name && hedge == other.hedge &&
max_staleness == other.max_staleness && tag_sets == other.tag_sets
end
# Select a server from the specified cluster, taking into account
# mongos pinning for the specified session.
#
# If the session is given and has a pinned server, this server is the
# only server considered for selection. If the server is of type mongos,
# it is returned immediately; otherwise monitoring checks on this
# server are initiated to update its status, and if the server becomes
# a mongos within the server selection timeout, it is returned.
#
# If no session is given or the session does not have a pinned server,
# normal server selection process is performed among all servers in the
# specified cluster matching the preference of this server selector
# object. Monitoring checks are initiated on servers in the cluster until
# a suitable server is found, up to the server selection timeout.
#
# If a suitable server is not found within the server selection timeout,
# this method raises Error::NoServerAvailable.
#
# @param [ Mongo::Cluster ] cluster The cluster from which to select
# an eligible server.
# @param [ true, false ] ping Whether to ping the server before selection.
# Deprecated and ignored.
# @param [ Session | nil ] session Optional session to take into account
# for mongos pinning. Added in version 2.10.0.
# @param [ true | false ] write_aggregation Whether we need a server that
# supports writing aggregations (e.g. with $merge/$out) on secondaries.
# @param [ Array<Server> ] deprioritized A list of servers that should
# be selected from only if no other servers are available. This is
# used to avoid selecting the same server twice in a row when
# retrying a command.
#
# @return [ Mongo::Server ] A server matching the server preference.
#
# @raise [ Error::NoServerAvailable ] No server was found matching the
# specified preference / pinning requirement in the server selection
# timeout.
# @raise [ Error::LintError ] An unexpected condition was detected, and
# lint mode is enabled.
#
# @since 2.0.0
def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: [])
select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server|
if Lint.enabled? && !server.pool.ready?
raise Error::LintError, 'Server selector returning a server with a pool which is not ready'
end
end
end
# Parameters and return values are the same as for select_server.
private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized)
if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
return cluster.servers.first
end
server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT
# Special handling for zero timeout: if we have to select a server,
# and the timeout is zero, fail immediately (since server selection
# will take some non-zero amount of time in any case).
if server_selection_timeout == 0
msg = "Failing server selection due to zero timeout. " +
" Requested #{name} in cluster: #{cluster.summary}"
raise Error::NoServerAvailable.new(self, cluster, msg)
end
deadline = Utils.monotonic_time + server_selection_timeout
if session && session.pinned_server
if Mongo::Lint.enabled?
unless cluster.sharded?
raise Error::LintError, "Session has a pinned server in a non-sharded topology: #{topology}"
end
end
if !session.in_transaction?
session.unpin
end
if server = session.pinned_server
# Here we assume that a mongos stays in the topology indefinitely.
# This will no longer be the case once SRV polling is implemented.
unless server.mongos?
while (time_remaining = deadline - Utils.monotonic_time) > 0
wait_for_server_selection(cluster, time_remaining)
end
unless server.mongos?
msg = "The session being used is pinned to the server which is not a mongos: #{server.summary} " +
"(after #{server_selection_timeout} seconds)"
raise Error::NoServerAvailable.new(self, cluster, msg)
end
end
return server
end
end
if cluster.replica_set?
validate_max_staleness_value_early!
end
if cluster.addresses.empty?
if Lint.enabled?
unless cluster.servers.empty?
raise Error::LintError, "Cluster has no addresses but has servers: #{cluster.servers.map(&:inspect).join(', ')}"
end
end
msg = "Cluster has no addresses, and therefore will never have a server"
raise Error::NoServerAvailable.new(self, cluster, msg)
end
=begin Add this check in version 3.0.0
unless cluster.connected?
msg = 'Cluster is disconnected'
raise Error::NoServerAvailable.new(self, cluster, msg)
end
=end
loop do
if Lint.enabled?
cluster.servers.each do |server|
# TODO: Add this back in RUBY-3174.
# if !server.unknown? && !server.connected?
# raise Error::LintError, "Server #{server.summary} is known but is not connected"
# end
if !server.unknown? && !server.pool.ready?
raise Error::LintError, "Server #{server.summary} is known but has non-ready pool"
end
end
end
server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized)
if server
unless cluster.topology.compatible?
raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
end
if session && session.starting_transaction? && cluster.sharded?
session.pin_to_server(server)
end
return server
end
cluster.scan!(false)
time_remaining = deadline - Utils.monotonic_time
if time_remaining > 0
wait_for_server_selection(cluster, time_remaining)
# If we wait for server selection, perform another round of
# attempting to locate a suitable server. Otherwise server selection
# can raise NoServerAvailable message when the diagnostics
# reports an available server of the requested type.
else
break
end
end
msg = "No #{name} server"
if is_a?(ServerSelector::Secondary) && !tag_sets.empty?
msg += " with tag sets: #{tag_sets}"
end
msg += " is available in cluster: #{cluster.summary} " +
"with timeout=#{server_selection_timeout}, " +
"LT=#{local_threshold_with_cluster(cluster)}"
msg += server_selection_diagnostic_message(cluster)
raise Error::NoServerAvailable.new(self, cluster, msg)
rescue Error::NoServerAvailable => e
if session && session.in_transaction? && !session.committing_transaction?
e.add_label('TransientTransactionError')
end
if session && session.committing_transaction?
e.add_label('UnknownTransactionCommitResult')
end
raise e
end
# Tries to find a suitable server, returns the server if one is available
# or nil if there isn't a suitable server.
#
# @param [ Mongo::Cluster ] cluster The cluster from which to select
# an eligible server.
# @param [ true | false ] write_aggregation Whether we need a server that
# supports writing aggregations (e.g. with $merge/$out) on secondaries.
# @param [ Array<Server> ] deprioritized A list of servers that should
# be selected from only if no other servers are available. This is
# used to avoid selecting the same server twice in a row when
# retrying a command.
#
# @return [ Server | nil ] A suitable server, if one exists.
#
# @api private
def try_select_server(cluster, write_aggregation: false, deprioritized: [])
servers = if write_aggregation && cluster.replica_set?
# 1. Check if ALL servers in cluster support secondary writes.
is_write_supported = cluster.servers.reduce(true) do |res, server|
res && server.features.merge_out_on_secondary_enabled?
end
if is_write_supported
# 2. If all servers support secondary writes, we respect read preference.
suitable_servers(cluster)
else
# 3. Otherwise we fallback to primary for replica set.
[cluster.servers.detect(&:primary?)]
end
else
suitable_servers(cluster)
end
# This list of servers may be ordered in a specific way
# by the selector (e.g. for secondary preferred, the first
# server may be a secondary and the second server may be primary)
# and we should take the first server here respecting the order
server = suitable_server(servers, deprioritized)
if server
if Lint.enabled?
# It is possible for a server to have a nil average RTT here
# because the ARTT comes from description which may be updated
# by a background thread while server selection is running.
# Currently lint mode is not a public feature, if/when this
# changes (https://jira.mongodb.org/browse/RUBY-1576) the
# requirement for ARTT to be not nil would need to be removed.
if server.average_round_trip_time.nil?
raise Error::LintError, "Server #{server.address} has nil average rtt"
end
end
end
server
end
# Returns servers of acceptable types from the cluster.
#
# Does not perform staleness validation, staleness filtering or
# latency filtering.
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ Array<Server> ] The candidate servers.
#
# @api private
def candidates(cluster)
servers = cluster.servers
servers.each do |server|
validate_max_staleness_support!(server)
end
if cluster.single?
servers
elsif cluster.sharded?
servers
elsif cluster.replica_set?
select_in_replica_set(servers)
else
# Unknown cluster - no servers
[]
end
end
# Returns servers satisfying the server selector from the cluster.
#
# @param [ Cluster ] cluster The cluster.
#
# @return [ Array<Server> ] The suitable servers.
#
# @api private
def suitable_servers(cluster)
if cluster.single?
candidates(cluster)
elsif cluster.sharded?
local_threshold = local_threshold_with_cluster(cluster)
servers = candidates(cluster)
near_servers(servers, local_threshold)
elsif cluster.replica_set?
validate_max_staleness_value!(cluster)
candidates(cluster)
else
# Unknown cluster - no servers
[]
end
end
private
# Returns a server from the list of servers that is suitable for
# executing the operation.
#
# @param [ Array<Server> ] servers The candidate servers.
# @param [ Array<Server> ] deprioritized A list of servers that should
# be selected from only if no other servers are available.
#
# @return [ Server | nil ] The suitable server or nil if no suitable
# server is available.
def suitable_server(servers, deprioritized)
preferred = servers - deprioritized
if preferred.empty?
servers.first
else
preferred.first
end
end
# Convert this server preference definition into a format appropriate
# for sending to a MongoDB server (i.e., as a command field).
#
# @return [ Hash ] The server preference formatted as a command field value.
#
# @since 2.0.0
def full_doc
@full_doc ||= begin
preference = { :mode => self.class.const_get(:SERVER_FORMATTED_NAME) }
preference.update(tags: tag_sets) unless tag_sets.empty?
preference.update(maxStalenessSeconds: max_staleness) if max_staleness
preference.update(hedge: hedge) if hedge
preference
end
end
# Select the primary from a list of provided candidates.
#
# @param [ Array ] candidates List of candidate servers to select the
# primary from.
#
# @return [ Array ] The primary.
#
# @since 2.0.0
def primary(candidates)
candidates.select do |server|
server.primary?
end
end
# Select the secondaries from a list of provided candidates.
#
# @param [ Array ] candidates List of candidate servers to select the
# secondaries from.
#
# @return [ Array ] The secondary servers.
#
# @since 2.0.0
def secondaries(candidates)
matching_servers = candidates.select(&:secondary?)
matching_servers = filter_stale_servers(matching_servers, primary(candidates).first)
matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty?
# Per server selection spec the server selected MUST be a random
# one matching staleness and latency requirements.
# Selectors always pass the output of #secondaries to #nearest
# which shuffles the server list, fulfilling this requirement.
matching_servers
end
# Select the near servers from a list of provided candidates, taking the
# local threshold into account.
#
# @param [ Array ] candidates List of candidate servers to select the
# near servers from.
# @param [ Integer ] local_threshold Local threshold. This parameter
# will be required in driver version 3.0.
#
# @return [ Array ] The near servers.
#
# @since 2.0.0
def near_servers(candidates = [], local_threshold = nil)
return candidates if candidates.empty?
# Average RTT on any server may change at any time by the server
# monitor's background thread. ARTT may also become nil if the
# server is marked unknown. Take a snapshot of ARTTs for the duration
# of this method.
candidates = candidates.map do |server|
{server: server, artt: server.average_round_trip_time}
end.reject do |candidate|
candidate[:artt].nil?
end
return candidates if candidates.empty?
nearest_candidate = candidates.min_by do |candidate|
candidate[:artt]
end
# Default for legacy signarure
local_threshold ||= self.local_threshold
threshold = nearest_candidate[:artt] + local_threshold
candidates.select do |candidate|
candidate[:artt] <= threshold
end.map do |candidate|
candidate[:server]
end.shuffle!
end
# Select the servers matching the defined tag sets.
#
# @param [ Array ] candidates List of candidate servers from which those
# matching the defined tag sets should be selected.
#
# @return [ Array ] The servers matching the defined tag sets.
#
# @since 2.0.0
def match_tag_sets(candidates)
matches = []
tag_sets.find do |tag_set|
matches = candidates.select { |server| server.matches_tag_set?(tag_set) }
!matches.empty?
end
matches || []
end
def filter_stale_servers(candidates, primary = nil)
return candidates unless @max_staleness
# last_scan is filled out by the Monitor, and can be nil if a server
# had its description manually set rather than being normally updated
# via the SDAM flow. We don't handle the possibility of a nil
# last_scan here.
if primary
candidates.select do |server|
validate_max_staleness_support!(server)
staleness = (server.last_scan - server.last_write_date) -
(primary.last_scan - primary.last_write_date) +
server.cluster.heartbeat_interval
staleness <= @max_staleness
end
else
max_write_date = candidates.collect(&:last_write_date).max
candidates.select do |server|
validate_max_staleness_support!(server)
staleness = max_write_date - server.last_write_date + server.cluster.heartbeat_interval
staleness <= @max_staleness
end
end
end
def validate!
if !@tag_sets.all? { |set| set.empty? } && !tags_allowed?
raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_TAG_SUPPORT)
elsif @max_staleness && !max_staleness_allowed?
raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_SUPPORT)
end
if @hedge
unless hedge_allowed?
raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_HEDGE_SUPPORT)
end
unless @hedge.is_a?(Hash) && @hedge.key?(:enabled) &&
[true, false].include?(@hedge[:enabled])
raise Error::InvalidServerPreference.new(
"`hedge` value (#{hedge}) is invalid - hedge must be a Hash in the " \
"format { enabled: true }"
)
end
end
end
def validate_max_staleness_support!(server)
if @max_staleness && !server.features.max_staleness_enabled?
raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_WITH_LEGACY_SERVER)
end
end
def validate_max_staleness_value_early!
if @max_staleness
unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS
msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
"`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS})"
raise Error::InvalidServerPreference.new(msg)
end
end
end
def validate_max_staleness_value!(cluster)
if @max_staleness
heartbeat_interval = cluster.heartbeat_interval
unless @max_staleness >= [
SMALLEST_MAX_STALENESS_SECONDS,
min_cluster_staleness = heartbeat_interval + Cluster::IDLE_WRITE_PERIOD_SECONDS,
].max
msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
"`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " +
"setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})"
raise Error::InvalidServerPreference.new(msg)
end
end
end
# Waits for server state changes in the specified cluster.
#
# If the cluster has a server selection semaphore, waits on that
# semaphore up to the specified remaining time. Any change in server
# state resulting from SDAM will immediately wake up this method and
# cause it to return.
#
# If the cluster des not have a server selection semaphore, waits
# the smaller of 0.25 seconds and the specified remaining time.
# This functionality is provided for backwards compatibilty only for
# applications directly invoking the server selection process.
# If lint mode is enabled and the cluster does not have a server
# selection semaphore, Error::LintError will be raised.
#
# @param [ Cluster ] cluster The cluster to wait for.
# @param [ Numeric ] time_remaining Maximum time to wait, in seconds.
def wait_for_server_selection(cluster, time_remaining)
if cluster.server_selection_semaphore
# Since the semaphore may have been signaled between us checking
# the servers list earlier and the wait call below, we should not
# wait for the full remaining time - wait for up to 0.5 second, then
# recheck the state.
cluster.server_selection_semaphore.wait([time_remaining, 0.5].min)
else
if Lint.enabled?
raise Error::LintError, 'Waiting for server selection without having a server selection semaphore'
end
sleep [time_remaining, 0.25].min
end
end
# Creates a diagnostic message when server selection fails.
#
# The diagnostic message includes the following information, as applicable:
#
# - Servers having dead monitor threads
# - Cluster is disconnected
#
# If none of the conditions for diagnostic messages apply, an empty string
# is returned.
#
# @param [ Cluster ] cluster The cluster on which server selection was
# performed.
#
# @return [ String ] The diagnostic message.
def server_selection_diagnostic_message(cluster)
msg = ''
dead_monitors = []
cluster.servers_list.each do |server|
thread = server.monitor.instance_variable_get('@thread')
if thread.nil? || !thread.alive?
dead_monitors << server
end
end
if dead_monitors.any?
msg += ". The following servers have dead monitor threads: #{dead_monitors.map(&:summary).join(', ')}"
end
unless cluster.connected?
msg += ". The cluster is disconnected (client may have been closed)"
end
msg
end
end
end
end