mongodb/mongo-ruby-driver

View on GitHub
lib/mongo/server_selector/base.rb

Summary

Maintainability
F
3 days
Test Coverage
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo

  module ServerSelector

    class Base

      # Initialize the server selector.
      #
      # @example Initialize the selector.
      #   Mongo::ServerSelector::Secondary.new(:tag_sets => [{'dc' => 'nyc'}])
      #
      # @example Initialize the preference with no options.
      #   Mongo::ServerSelector::Secondary.new
      #
      # @param [ Hash ] options The server preference options.
      #
      # @option options [ Integer ] :local_threshold The local threshold boundary for
      #  nearest selection in seconds.
      # @option options [ Integer ] max_staleness The maximum replication lag,
      #   in seconds, that a secondary can suffer and still be eligible for a read.
      #   A value of -1 is treated identically to nil, which is to not
      #   have a maximum staleness.
      # @option options [ Hash | nil ] hedge A Hash specifying whether to enable hedged
      #   reads on the server. Hedged reads are not enabled by default. When
      #   specifying this option, it must be in the format: { enabled: true },
      #   where the value of the :enabled key is a boolean value.
      #
      # @raise [ Error::InvalidServerPreference ] If tag sets are specified
      #   but not allowed.
      #
      # @api private
      def initialize(options = nil)
        options = options ? options.dup : {}
        if options[:max_staleness] == -1
          options.delete(:max_staleness)
        end
        @options = options
        @tag_sets = options[:tag_sets] || []
        @max_staleness = options[:max_staleness]
        @hedge = options[:hedge]

        validate!
      end

      # @return [ Hash ] options The options.
      attr_reader :options

      # @return [ Array ] tag_sets The tag sets used to select servers.
      attr_reader :tag_sets

      # @return [ Integer ] max_staleness The maximum replication lag, in
      #   seconds, that a secondary can suffer and still be eligible for a read.
      #
      # @since 2.4.0
      attr_reader :max_staleness

      # @return [ Hash | nil ] hedge The document specifying whether to enable
      #   hedged reads.
      attr_reader :hedge

      # Get the timeout for server selection.
      #
      # @example Get the server selection timeout, in seconds.
      #   selector.server_selection_timeout
      #
      # @return [ Float ] The timeout.
      #
      # @since 2.0.0
      #
      # @deprecated This setting is now taken from the cluster options when
      #   a server is selected. Will be removed in version 3.0.
      def server_selection_timeout
        @server_selection_timeout ||=
          (options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT)
      end

      # Get the local threshold boundary for nearest selection in seconds.
      #
      # @example Get the local threshold.
      #   selector.local_threshold
      #
      # @return [ Float ] The local threshold.
      #
      # @since 2.0.0
      #
      # @deprecated This setting is now taken from the cluster options when
      #   a server is selected. Will be removed in version 3.0.
      def local_threshold
        @local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD)
      end

      # @api private
      def local_threshold_with_cluster(cluster)
        options[:local_threshold] || cluster.options[:local_threshold] || LOCAL_THRESHOLD
      end

      # Inspect the server selector.
      #
      # @example Inspect the server selector.
      #   selector.inspect
      #
      # @return [ String ] The inspection.
      #
      # @since 2.2.0
      def inspect
        "#<#{self.class.name}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect} hedge=#{hedge}>"
      end

      # Check equality of two server selectors.
      #
      # @example Check server selector equality.
      #   preference == other
      #
      # @param [ Object ] other The other preference.
      #
      # @return [ true, false ] Whether the objects are equal.
      #
      # @since 2.0.0
      def ==(other)
        name == other.name && hedge == other.hedge &&
          max_staleness == other.max_staleness && tag_sets == other.tag_sets
      end

      # Select a server from the specified cluster, taking into account
      # mongos pinning for the specified session.
      #
      # If the session is given and has a pinned server, this server is the
      # only server considered for selection. If the server is of type mongos,
      # it is returned immediately; otherwise monitoring checks on this
      # server are initiated to update its status, and if the server becomes
      # a mongos within the server selection timeout, it is returned.
      #
      # If no session is given or the session does not have a pinned server,
      # normal server selection process is performed among all servers in the
      # specified cluster matching the preference of this server selector
      # object. Monitoring checks are initiated on servers in the cluster until
      # a suitable server is found, up to the server selection timeout.
      #
      # If a suitable server is not found within the server selection timeout,
      # this method raises Error::NoServerAvailable.
      #
      # @param [ Mongo::Cluster ] cluster The cluster from which to select
      #   an eligible server.
      # @param [ true, false ] ping Whether to ping the server before selection.
      #   Deprecated and ignored.
      # @param [ Session | nil ] session Optional session to take into account
      #   for mongos pinning. Added in version 2.10.0.
      # @param [ true | false ] write_aggregation Whether we need a server that
      #   supports writing aggregations (e.g. with $merge/$out) on secondaries.
      # @param [ Array<Server> ] deprioritized A list of servers that should
      #   be selected from only if no other servers are available. This is
      #   used to avoid selecting the same server twice in a row when
      #   retrying a command.
      #
      # @return [ Mongo::Server ] A server matching the server preference.
      #
      # @raise [ Error::NoServerAvailable ] No server was found matching the
      #   specified preference / pinning requirement in the server selection
      #   timeout.
      # @raise [ Error::LintError ] An unexpected condition was detected, and
      #   lint mode is enabled.
      #
      # @since 2.0.0
      def select_server(cluster, ping = nil, session = nil, write_aggregation: false, deprioritized: [])
        select_server_impl(cluster, ping, session, write_aggregation, deprioritized).tap do |server|
          if Lint.enabled? && !server.pool.ready?
            raise Error::LintError, 'Server selector returning a server with a pool which is not ready'
          end
        end
      end

      # Parameters and return values are the same as for select_server.
      private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized)
        if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
          return cluster.servers.first
        end

        server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT

        # Special handling for zero timeout: if we have to select a server,
        # and the timeout is zero, fail immediately (since server selection
        # will take some non-zero amount of time in any case).
        if server_selection_timeout == 0
          msg = "Failing server selection due to zero timeout. " +
            " Requested #{name} in cluster: #{cluster.summary}"
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end

        deadline = Utils.monotonic_time + server_selection_timeout

        if session && session.pinned_server
          if Mongo::Lint.enabled?
            unless cluster.sharded?
              raise Error::LintError, "Session has a pinned server in a non-sharded topology: #{topology}"
            end
          end

          if !session.in_transaction?
            session.unpin
          end

          if server = session.pinned_server
            # Here we assume that a mongos stays in the topology indefinitely.
            # This will no longer be the case once SRV polling is implemented.

            unless server.mongos?
              while (time_remaining = deadline - Utils.monotonic_time) > 0
                wait_for_server_selection(cluster, time_remaining)
              end

              unless server.mongos?
                msg = "The session being used is pinned to the server which is not a mongos: #{server.summary} " +
                  "(after #{server_selection_timeout} seconds)"
                raise Error::NoServerAvailable.new(self, cluster, msg)
              end
            end

            return server
          end
        end

        if cluster.replica_set?
          validate_max_staleness_value_early!
        end

        if cluster.addresses.empty?
          if Lint.enabled?
            unless cluster.servers.empty?
              raise Error::LintError, "Cluster has no addresses but has servers: #{cluster.servers.map(&:inspect).join(', ')}"
            end
          end
          msg = "Cluster has no addresses, and therefore will never have a server"
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end

=begin Add this check in version 3.0.0
        unless cluster.connected?
          msg = 'Cluster is disconnected'
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end
=end

        loop do
          if Lint.enabled?
            cluster.servers.each do |server|
              # TODO: Add this back in RUBY-3174.
              # if !server.unknown? && !server.connected?
              #   raise Error::LintError, "Server #{server.summary} is known but is not connected"
              # end
              if !server.unknown? && !server.pool.ready?
                raise Error::LintError, "Server #{server.summary} is known but has non-ready pool"
              end
            end
          end

          server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized)

          if server
            unless cluster.topology.compatible?
              raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
            end

            if session && session.starting_transaction? && cluster.sharded?
              session.pin_to_server(server)
            end

            return server
          end

          cluster.scan!(false)

          time_remaining = deadline - Utils.monotonic_time
          if time_remaining > 0
            wait_for_server_selection(cluster, time_remaining)

            # If we wait for server selection, perform another round of
            # attempting to locate a suitable server. Otherwise server selection
            # can raise NoServerAvailable message when the diagnostics
            # reports an available server of the requested type.
          else
            break
          end
        end

        msg = "No #{name} server"
        if is_a?(ServerSelector::Secondary) && !tag_sets.empty?
          msg += " with tag sets: #{tag_sets}"
        end
        msg += " is available in cluster: #{cluster.summary} " +
                "with timeout=#{server_selection_timeout}, " +
                "LT=#{local_threshold_with_cluster(cluster)}"
        msg += server_selection_diagnostic_message(cluster)
        raise Error::NoServerAvailable.new(self, cluster, msg)
      rescue Error::NoServerAvailable => e
        if session && session.in_transaction? && !session.committing_transaction?
          e.add_label('TransientTransactionError')
        end
        if session && session.committing_transaction?
          e.add_label('UnknownTransactionCommitResult')
        end
        raise e
      end

      # Tries to find a suitable server, returns the server if one is available
      # or nil if there isn't a suitable server.
      #
      # @param [ Mongo::Cluster ] cluster The cluster from which to select
      #   an eligible server.
      # @param [ true | false ] write_aggregation Whether we need a server that
      #   supports writing aggregations (e.g. with $merge/$out) on secondaries.
      # @param [ Array<Server> ] deprioritized A list of servers that should
      #   be selected from only if no other servers are available. This is
      #   used to avoid selecting the same server twice in a row when
      #   retrying a command.
      #
      # @return [ Server | nil ] A suitable server, if one exists.
      #
      # @api private
      def try_select_server(cluster, write_aggregation: false, deprioritized: [])
        servers = if write_aggregation && cluster.replica_set?
          # 1. Check if ALL servers in cluster support secondary writes.
          is_write_supported = cluster.servers.reduce(true) do |res, server|
            res && server.features.merge_out_on_secondary_enabled?
          end

          if is_write_supported
            # 2. If all servers support secondary writes, we respect read preference.
            suitable_servers(cluster)
          else
            # 3. Otherwise we fallback to primary for replica set.
            [cluster.servers.detect(&:primary?)]
          end
        else
          suitable_servers(cluster)
        end

        # This list of servers may be ordered in a specific way
        # by the selector (e.g. for secondary preferred, the first
        # server may be a secondary and the second server may be primary)
        # and we should take the first server here respecting the order
        server = suitable_server(servers, deprioritized)

        if server
          if Lint.enabled?
            # It is possible for a server to have a nil average RTT here
            # because the ARTT comes from description which may be updated
            # by a background thread while server selection is running.
            # Currently lint mode is not a public feature, if/when this
            # changes (https://jira.mongodb.org/browse/RUBY-1576) the
            # requirement for ARTT to be not nil would need to be removed.
            if server.average_round_trip_time.nil?
              raise Error::LintError, "Server #{server.address} has nil average rtt"
            end
          end
        end

        server
      end

      # Returns servers of acceptable types from the cluster.
      #
      # Does not perform staleness validation, staleness filtering or
      # latency filtering.
      #
      # @param [ Cluster ] cluster The cluster.
      #
      # @return [ Array<Server> ] The candidate servers.
      #
      # @api private
      def candidates(cluster)
        servers = cluster.servers
        servers.each do |server|
          validate_max_staleness_support!(server)
        end
        if cluster.single?
          servers
        elsif cluster.sharded?
          servers
        elsif cluster.replica_set?
          select_in_replica_set(servers)
        else
          # Unknown cluster - no servers
          []
        end
      end

      # Returns servers satisfying the server selector from the cluster.
      #
      # @param [ Cluster ] cluster The cluster.
      #
      # @return [ Array<Server> ] The suitable servers.
      #
      # @api private
      def suitable_servers(cluster)
        if cluster.single?
          candidates(cluster)
        elsif cluster.sharded?
          local_threshold = local_threshold_with_cluster(cluster)
          servers = candidates(cluster)
          near_servers(servers, local_threshold)
        elsif cluster.replica_set?
          validate_max_staleness_value!(cluster)
          candidates(cluster)
        else
          # Unknown cluster - no servers
          []
        end
      end

      private

      # Returns a server from the list of servers that is suitable for
      # executing the operation.
      #
      # @param [ Array<Server> ] servers The candidate servers.
      # @param [ Array<Server> ] deprioritized A list of servers that should
      #  be selected from only if no other servers are available.
      #
      # @return [ Server | nil ] The suitable server or nil if no suitable
      #  server is available.
      def suitable_server(servers, deprioritized)
        preferred = servers - deprioritized
        if preferred.empty?
          servers.first
        else
          preferred.first
        end
      end

      # Convert this server preference definition into a format appropriate
      #   for sending to a MongoDB server (i.e., as a command field).
      #
      # @return [ Hash ] The server preference formatted as a command field value.
      #
      # @since 2.0.0
      def full_doc
        @full_doc ||= begin
          preference = { :mode => self.class.const_get(:SERVER_FORMATTED_NAME) }
          preference.update(tags: tag_sets) unless tag_sets.empty?
          preference.update(maxStalenessSeconds: max_staleness) if max_staleness
          preference.update(hedge: hedge) if hedge
          preference
        end
      end

      # Select the primary from a list of provided candidates.
      #
      # @param [ Array ] candidates List of candidate servers to select the
      #   primary from.
      #
      # @return [ Array ] The primary.
      #
      # @since 2.0.0
      def primary(candidates)
        candidates.select do |server|
          server.primary?
        end
      end

      # Select the secondaries from a list of provided candidates.
      #
      # @param [ Array ] candidates List of candidate servers to select the
      #   secondaries from.
      #
      # @return [ Array ] The secondary servers.
      #
      # @since 2.0.0
      def secondaries(candidates)
        matching_servers = candidates.select(&:secondary?)
        matching_servers = filter_stale_servers(matching_servers, primary(candidates).first)
        matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty?
        # Per server selection spec the server selected MUST be a random
        # one matching staleness and latency requirements.
        # Selectors always pass the output of #secondaries to #nearest
        # which shuffles the server list, fulfilling this requirement.
        matching_servers
      end

      # Select the near servers from a list of provided candidates, taking the
      #   local threshold into account.
      #
      # @param [ Array ] candidates List of candidate servers to select the
      #   near servers from.
      # @param [ Integer ] local_threshold Local threshold. This parameter
      #   will be required in driver version 3.0.
      #
      # @return [ Array ] The near servers.
      #
      # @since 2.0.0
      def near_servers(candidates = [], local_threshold = nil)
        return candidates if candidates.empty?

        # Average RTT on any server may change at any time by the server
        # monitor's background thread. ARTT may also become nil if the
        # server is marked unknown. Take a snapshot of ARTTs for the duration
        # of this method.

        candidates = candidates.map do |server|
          {server: server, artt: server.average_round_trip_time}
        end.reject do |candidate|
          candidate[:artt].nil?
        end

        return candidates if candidates.empty?

        nearest_candidate = candidates.min_by do |candidate|
          candidate[:artt]
        end

        # Default for legacy signarure
        local_threshold ||= self.local_threshold

        threshold = nearest_candidate[:artt] + local_threshold

        candidates.select do |candidate|
          candidate[:artt] <= threshold
        end.map do |candidate|
          candidate[:server]
        end.shuffle!
      end

      # Select the servers matching the defined tag sets.
      #
      # @param [ Array ] candidates List of candidate servers from which those
      #   matching the defined tag sets should be selected.
      #
      # @return [ Array ] The servers matching the defined tag sets.
      #
      # @since 2.0.0
      def match_tag_sets(candidates)
        matches = []
        tag_sets.find do |tag_set|
          matches = candidates.select { |server| server.matches_tag_set?(tag_set) }
          !matches.empty?
        end
        matches || []
      end

      def filter_stale_servers(candidates, primary = nil)
        return candidates unless @max_staleness

        # last_scan is filled out by the Monitor, and can be nil if a server
        # had its description manually set rather than being normally updated
        # via the SDAM flow. We don't handle the possibility of a nil
        # last_scan here.
        if primary
          candidates.select do |server|
            validate_max_staleness_support!(server)
            staleness = (server.last_scan - server.last_write_date) -
                        (primary.last_scan - primary.last_write_date)  +
                        server.cluster.heartbeat_interval
            staleness <= @max_staleness
          end
        else
          max_write_date = candidates.collect(&:last_write_date).max
          candidates.select do |server|
            validate_max_staleness_support!(server)
            staleness = max_write_date - server.last_write_date + server.cluster.heartbeat_interval
            staleness <= @max_staleness
          end
        end
      end

      def validate!
        if !@tag_sets.all? { |set| set.empty? } && !tags_allowed?
          raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_TAG_SUPPORT)
        elsif @max_staleness && !max_staleness_allowed?
          raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_SUPPORT)
        end

        if @hedge
          unless hedge_allowed?
            raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_HEDGE_SUPPORT)
          end

          unless @hedge.is_a?(Hash) && @hedge.key?(:enabled) &&
              [true, false].include?(@hedge[:enabled])
            raise Error::InvalidServerPreference.new(
              "`hedge` value (#{hedge}) is invalid - hedge must be a Hash in the " \
              "format { enabled: true }"
            )
          end
        end
      end

      def validate_max_staleness_support!(server)
        if @max_staleness && !server.features.max_staleness_enabled?
          raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_WITH_LEGACY_SERVER)
        end
      end

      def validate_max_staleness_value_early!
        if @max_staleness
          unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS
            msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
              "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS})"
            raise Error::InvalidServerPreference.new(msg)
          end
        end
      end

      def validate_max_staleness_value!(cluster)
        if @max_staleness
          heartbeat_interval = cluster.heartbeat_interval
          unless @max_staleness >= [
            SMALLEST_MAX_STALENESS_SECONDS,
            min_cluster_staleness = heartbeat_interval + Cluster::IDLE_WRITE_PERIOD_SECONDS,
          ].max
            msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
              "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " +
              "setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})"
            raise Error::InvalidServerPreference.new(msg)
          end
        end
      end

      # Waits for server state changes in the specified cluster.
      #
      # If the cluster has a server selection semaphore, waits on that
      # semaphore up to the specified remaining time. Any change in server
      # state resulting from SDAM will immediately wake up this method and
      # cause it to return.
      #
      # If the cluster des not have a server selection semaphore, waits
      # the smaller of 0.25 seconds and the specified remaining time.
      # This functionality is provided for backwards compatibilty only for
      # applications directly invoking the server selection process.
      # If lint mode is enabled and the cluster does not have a server
      # selection semaphore, Error::LintError will be raised.
      #
      # @param [ Cluster ] cluster The cluster to wait for.
      # @param [ Numeric ] time_remaining Maximum time to wait, in seconds.
      def wait_for_server_selection(cluster, time_remaining)
        if cluster.server_selection_semaphore
          # Since the semaphore may have been signaled between us checking
          # the servers list earlier and the wait call below, we should not
          # wait for the full remaining time - wait for up to 0.5 second, then
          # recheck the state.
          cluster.server_selection_semaphore.wait([time_remaining, 0.5].min)
        else
          if Lint.enabled?
            raise Error::LintError, 'Waiting for server selection without having a server selection semaphore'
          end
          sleep [time_remaining, 0.25].min
        end
      end

      # Creates a diagnostic message when server selection fails.
      #
      # The diagnostic message includes the following information, as applicable:
      #
      # - Servers having dead monitor threads
      # - Cluster is disconnected
      #
      # If none of the conditions for diagnostic messages apply, an empty string
      # is returned.
      #
      # @param [ Cluster ] cluster The cluster on which server selection was
      #   performed.
      #
      # @return [ String ] The diagnostic message.
      def server_selection_diagnostic_message(cluster)
        msg = ''
        dead_monitors = []
        cluster.servers_list.each do |server|
          thread = server.monitor.instance_variable_get('@thread')
          if thread.nil? || !thread.alive?
            dead_monitors << server
          end
        end
        if dead_monitors.any?
          msg += ". The following servers have dead monitor threads: #{dead_monitors.map(&:summary).join(', ')}"
        end
        unless cluster.connected?
          msg += ". The cluster is disconnected (client may have been closed)"
        end
        msg
      end
    end
  end
end